Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
inputs = {}
prior_session_count = 0
# get scans
scans = xnat.get('scans', list())
for s in scans:
name = ProcessorParser._input_name(s)
select = s.get('select', None)
parsed_select = ProcessorParser._parse_select(select)
session_select = s.get('select-session', None)
parsed_session_select =\
ProcessorParser._parse_session_select(session_select)
ProcessorParser._register_iteration_references(
name,
parsed_select,
iteration_sources,
iteration_map)
types = [_.strip() for _ in s['types'].split(',')]
ProcessorParser._register_input_types(types, inputs_by_type, name)
resources = s.get('resources', [])
artefact_required = False
for r in resources:
r['required'] = r.get('required', True)
artefact_required = artefact_required or r['required']
# get inputs: pass 1
input_dict = yaml_source['inputs']
xnat = input_dict['xnat']
if xnat == None:
raise ValueError(
'yaml processor is missing xnat keyword contents')
inputs = {}
prior_session_count = 0
# get scans
scans = xnat.get('scans', list())
for s in scans:
name = ProcessorParser._input_name(s)
select = s.get('select', None)
parsed_select = ProcessorParser._parse_select(select)
session_select = s.get('select-session', None)
parsed_session_select =\
ProcessorParser._parse_session_select(session_select)
ProcessorParser._register_iteration_references(
name,
parsed_select,
iteration_sources,
iteration_map)
types = [_.strip() for _ in s['types'].split(',')]
ProcessorParser._register_input_types(types, inputs_by_type, name)
select = a.get('select', None)
parsed_select = ProcessorParser._parse_select(select)
session_select = a.get('select-session', None)
parsed_session_select =\
ProcessorParser._parse_session_select(session_select)
ProcessorParser._register_iteration_references(
name,
parsed_select,
iteration_sources,
iteration_map)
types = [_.strip() for _ in a['proctypes'].split(',')]
ProcessorParser._register_input_types(types, inputs_by_type, name)
resources = a.get('resources', [])
artefact_required = False
for r in resources:
r['required'] = r.get('required', True)
artefact_required = artefact_required or r['required']
inputs[name] = {
'types': types,
'select': parsed_select,
'select-session': parsed_session_select,
'artefact_type': 'assessor',
'needs_qc': a.get('needs_qc', False),
'resources': a.get('resources', []),
'required': artefact_required
}
prior_session_count = 0
# get scans
scans = xnat.get('scans', list())
for s in scans:
name = ProcessorParser._input_name(s)
select = s.get('select', None)
parsed_select = ProcessorParser._parse_select(select)
session_select = s.get('select-session', None)
parsed_session_select =\
ProcessorParser._parse_session_select(session_select)
ProcessorParser._register_iteration_references(
name,
parsed_select,
iteration_sources,
iteration_map)
types = [_.strip() for _ in s['types'].split(',')]
ProcessorParser._register_input_types(types, inputs_by_type, name)
resources = s.get('resources', [])
artefact_required = False
for r in resources:
r['required'] = r.get('required', True)
artefact_required = artefact_required or r['required']
inputs[name] = {
'types': types,
artefacts_by_input = \
ProcessorParser.map_artefacts_to_inputs(ordered_sessions,
self.inputs,
self.inputs_by_type)
parameter_matrix = \
ProcessorParser.generate_parameter_matrix(
self.inputs,
self.iteration_sources,
self.iteration_map,
artefacts,
artefacts_by_input)
assessor_parameter_map = \
ProcessorParser.compare_to_existing(ordered_sessions,
self.proctype,
parameter_matrix)
self.csess = csess
self.artefacts = artefacts
self.artefacts_by_input = artefacts_by_input
self.parameter_matrix = parameter_matrix
self.assessor_parameter_map = assessor_parameter_map
csess.project_id(),
csess.subject_id(),
s.label())
for s in subj.experiments()]
x = [TimestampSession(s.creation_timestamp(), s) for s in x]
ordered_sessions = map(lambda y: y.session,
sorted(x,
key=lambda v: v.timestamp,
reverse=True))
ordered_sessions =\
filter(
lambda y: y.creation_timestamp() <= csess.creation_timestamp(),
ordered_sessions)
artefacts = ProcessorParser.parse_artefacts(ordered_sessions)
artefacts_by_input = \
ProcessorParser.map_artefacts_to_inputs(ordered_sessions,
self.inputs,
self.inputs_by_type)
parameter_matrix = \
ProcessorParser.generate_parameter_matrix(
self.inputs,
self.iteration_sources,
self.iteration_map,
artefacts,
artefacts_by_input)
assessor_parameter_map = \
ProcessorParser.compare_to_existing(ordered_sessions,
raise AutoProcessorError("Parameter 'xnat' must be provided")
if not yaml_source:
raise AutoProcessorError(
"Parameter 'yaml_source' must be provided")
self.xnat = xnat
self.user_overrides = dict()
self.extra_user_overrides = dict()
self._read_yaml(yaml_source)
# Edit the values from user inputs:
if user_inputs is not None:
self._edit_inputs(user_inputs, yaml_source)
self.parser = processor_parser.ProcessorParser(
yaml_source.contents, self.proctype)
# Set up attrs:
self.walltime_str = self.attrs.get('walltime')
self.memreq_mb = self.attrs.get('memory')
self.ppn = self.attrs.get('ppn', 1)
self.env = self.attrs.get('env', None)
self.xsitype = self.attrs.get('xsitype', 'proc:genProcData')
self.full_regex = self.attrs.get('fullregex', False)
self.suffix = self.attrs.get('suffix', None)
def __init__(self, yaml_source, proctype=None):
self.yaml_source = yaml_source
self.inputs,\
self.inputs_by_type,\
self.iteration_sources,\
self.iteration_map,\
self.prior_session_count =\
ProcessorParser.parse_inputs(yaml_source)
self.variables_to_inputs = ProcessorParser.parse_variables(self.inputs)
self.csess = None
self.artefacts = None
self.artefacts_by_input = None
self.parameter_matrix = None
self.assessor_parameter_map = None
self.xsitype = yaml_source['attrs'].get('xsitype', 'proc:genProcData')
if proctype:
self.proctype = proctype
else:
self.proctype = XnatUtils.get_proctype(
yaml_source['inputs']['default']['spider_path'])[0]
def __init__(self, yaml_source, proctype=None):
self.yaml_source = yaml_source
self.inputs,\
self.inputs_by_type,\
self.iteration_sources,\
self.iteration_map,\
self.prior_session_count =\
ProcessorParser.parse_inputs(yaml_source)
self.variables_to_inputs = ProcessorParser.parse_variables(self.inputs)
self.csess = None
self.artefacts = None
self.artefacts_by_input = None
self.parameter_matrix = None
self.assessor_parameter_map = None
self.xsitype = yaml_source['attrs'].get('xsitype', 'proc:genProcData')
if proctype:
self.proctype = proctype
else:
self.proctype = XnatUtils.get_proctype(
yaml_source['inputs']['default']['spider_path'])[0]