Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Used to retrieve subsections of the ini file for
configuration options.
Returns
--------
veto_seg_files : FileList
List of veto segment files generated
"""
if tags is None:
tags = []
if runtime_names is None:
runtime_names = []
if in_workflow_names is None:
in_workflow_names = []
logging.info('Starting generating veto files for analysis')
make_analysis_dir(out_dir)
start_time = workflow.analysis_time[0]
end_time = workflow.analysis_time[1]
save_veto_definer(workflow.cp, out_dir, tags)
now_cat_sets = []
for name in runtime_names:
cat_sets = parse_cat_ini_opt(workflow.cp.get_opt_tags(
'workflow-segments', name, tags))
now_cat_sets.extend(cat_sets)
now_cats = set()
for cset in now_cat_sets:
now_cats = now_cats.union(cset)
later_cat_sets = []
for name in in_workflow_names:
def rerank_coinc_followup(workflow, statmap_file, bank_file, out_dir, tags,
injection_file=None,
ranking_file=None):
make_analysis_dir(out_dir)
if not workflow.cp.has_section("workflow-rerank"):
logging.info("No reranking done in this workflow")
return statmap_file
else:
logging.info("Setting up reranking of candidates")
# Generate reduced data files (maybe this could also be used elsewhere?)
stores = FileList([])
for ifo in workflow.ifos:
make_analysis_dir('strain_files')
node = Executable(workflow.cp, 'strain_data_reduce', ifos=[ifo],
out_dir='strain_files').create_node()
node.add_opt('--gps-start-time', workflow.analysis_time[0])
node.add_opt('--gps-end-time', workflow.analysis_time[1])
if injection_file:
node.add_input_opt('--injection-file', injection_file)
fil = node.new_output_file_opt(workflow.analysis_time, '.hdf',
'--output-file', tags=tags)
stores.append(fil)
workflow += node
# Generate trigger input file
node = Executable(workflow.cp, 'rerank_trigger_input', ifos=workflow.ifos,
out_dir=out_dir, tags=tags).create_node()
node.add_input_opt('--statmap-file', statmap_file)
def merge_single_detector_hdf_files(workflow, bank_file, trigger_files, out_dir, tags=[]):
make_analysis_dir(out_dir)
out = FileList()
for ifo in workflow.ifos:
node = MergeExecutable(workflow.cp, 'hdf_trigger_merge',
ifos=ifo, out_dir=out_dir, tags=tags).create_node()
node.add_input_opt('--bank-file', bank_file)
node.add_input_list_opt('--trigger-files', trigger_files.find_output_with_ifo(ifo))
node.new_output_file_opt(workflow.analysis_time, '.hdf', '--output-file')
workflow += node
out += node.output_files
return out
sci_avlble_file : SegFile
SegFile containing the analysable time after checks in the datafind
module are applied to the input segment list. For production runs this
is expected to be equal to the input segment list.
scienceSegs : Dictionary of ifo keyed glue.segment.segmentlist instances
This contains the times that the workflow is expected to analyse. If
the updateSegmentTimes kwarg is given this will be updated to reflect
any instances of missing data.
sci_avlble_name : string
The name with which the analysable time is stored in the
sci_avlble_file.
"""
if tags is None:
tags = []
logging.info("Entering datafind module")
make_analysis_dir(outputDir)
cp = workflow.cp
# Parse for options in ini file
datafind_method = cp.get_opt_tags("workflow-datafind",
"datafind-method", tags)
if cp.has_option_tags("workflow-datafind",
"datafind-check-segment-gaps", tags):
checkSegmentGaps = cp.get_opt_tags("workflow-datafind",
"datafind-check-segment-gaps", tags)
else:
checkSegmentGaps = "no_test"
if cp.has_option_tags("workflow-datafind",
"datafind-check-frames-exist", tags):
checkFramesExist = cp.get_opt_tags("workflow-datafind",
"datafind-check-frames-exist", tags)
A list of the tagging strings that will be used for all jobs created
by this call to the workflow. This will be used in output names.
Returns
--------
inj_files : pycbc.workflow.core.FileList
The list of injection files created by this call.
inj_tags : list of strings
The tag corresponding to each injection file and used to uniquely
identify them. The FileList class contains functions to search
based on tags.
"""
if tags is None:
tags = []
logging.info("Entering injection module.")
make_analysis_dir(output_dir)
# Get full analysis segment for output file naming
full_segment = workflow.analysis_time
ifos = workflow.ifos
# Identify which injections to do by presence of sub-sections in
# the configuration file
inj_tags = []
inj_files = FileList([])
for section in workflow.cp.get_subsections(inj_section_name):
inj_tag = section.upper()
curr_tags = tags + [inj_tag]
# FIXME: Remove once fixed in pipedown
# TEMPORARILY we require inj tags to end in "INJ"
def merge_psds(workflow, files, ifo, out_dir, tags=None):
make_analysis_dir(out_dir)
tags = [] if not tags else tags
node = MergePSDFiles(workflow.cp, 'merge_psds',
ifos=ifo, out_dir=out_dir,
tags=tags).create_node()
node.add_input_list_opt('--psd-files', files)
node.new_output_file_opt(workflow.analysis_time, '.hdf', '--output-file')
workflow += node
return node.output_files[0]
def make_average_psd(workflow, psd_files, out_dir, tags=None,
output_fmt='.txt'):
make_analysis_dir(out_dir)
tags = [] if tags is None else tags
node = AvgPSDExecutable(workflow.cp, 'average_psd', ifos=workflow.ifos,
out_dir=out_dir, tags=tags).create_node()
node.add_input_list_opt('--input-files', psd_files)
if len(workflow.ifos) > 1:
node.new_output_file_opt(workflow.analysis_time, output_fmt,
'--detector-avg-file')
node.new_multiifo_output_list_opt('--time-avg-file', workflow.ifos,
workflow.analysis_time, output_fmt, tags=tags)
workflow += node
return node.output_files
def veto_injections(workflow, inj_file, veto_file, veto_name, out_dir, tags=None):
tags = [] if tags is None else tags
make_analysis_dir(out_dir)
node = Executable(workflow.cp, 'strip_injections', ifos=workflow.ifos,
out_dir=out_dir, tags=tags).create_node()
node.add_opt('--segment-name', veto_name)
node.add_input_opt('--veto-file', veto_file)
node.add_input_opt('--injection-file', inj_file)
node.add_opt('--ifos', ' '.join(workflow.ifos))
node.new_output_file_opt(workflow.analysis_time, '.xml', '--output-file')
workflow += node
return node.output_files[0]
configuration options.
Returns
--------
sci_seg_file : workflow.core.SegFile instance
The segment file combined from all ifos containing the science segments.
sci_segs : Ifo keyed dict of ligo.segments.segmentlist instances
The science segs for each ifo, keyed by ifo
sci_seg_name : str
The name with which science segs are stored in the output XML file.
"""
if tags is None:
tags = []
logging.info('Starting generation of science segments')
make_analysis_dir(out_dir)
start_time = workflow.analysis_time[0]
end_time = workflow.analysis_time[1]
# NOTE: Should this be overrideable in the config file?
sci_seg_name = "SCIENCE"
sci_segs = {}
sci_seg_dict = segments.segmentlistdict()
sci_seg_summ_dict = segments.segmentlistdict()
for ifo in workflow.ifos:
curr_sci_segs, curr_sci_xml, curr_seg_name = get_sci_segs_for_ifo(ifo,
workflow.cp, start_time, end_time, out_dir, tags)
sci_seg_dict[ifo + ':' + sci_seg_name] = curr_sci_segs
sci_segs[ifo] = curr_sci_segs
sci_seg_summ_dict[ifo + ':' + sci_seg_name] = \
curr_sci_xml.seg_summ_dict[ifo + ':' + curr_seg_name]
def convert_bank_to_hdf(workflow, xmlbank, out_dir, tags=None):
"""Return the template bank in hdf format"""
if tags is None:
tags = []
#FIXME, make me not needed
if len(xmlbank) > 1:
raise ValueError('Can only convert a single template bank')
logging.info('convert template bank to HDF')
make_analysis_dir(out_dir)
bank2hdf_exe = PyCBCBank2HDFExecutable(workflow.cp, 'bank2hdf',
ifos=workflow.ifos,
out_dir=out_dir, tags=tags)
bank2hdf_node = bank2hdf_exe.create_node(xmlbank[0])
workflow.add_node(bank2hdf_node)
return bank2hdf_node.output_files