Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
segs = trig_files.get_times_covered_by_files()
seg = segments.segment(segs[0][0], segs[-1][1])
node = Node(self)
node.set_memory(10000)
node.add_input_opt('--template-bank', bank_file)
node.add_input_list_opt('--trigger-files', trig_files)
if len(stat_files) > 0:
node.add_input_list_opt('--statistic-files', stat_files)
if veto_file is not None:
node.add_input_opt('--veto-files', veto_file)
node.add_opt('--segment-name', veto_name)
node.add_opt('--template-fraction-range', template_str)
node.new_output_file_opt(seg, '.hdf', '--output-file', tags=tags)
return node
class PyCBCFindMultiifoCoincExecutable(Executable):
"""Find coinc triggers using a folded interval method"""
current_retention_level = Executable.ALL_TRIGGERS
file_input_options = ['--statistic-files']
def create_node(self, trig_files, bank_file, stat_files, veto_file,
veto_name, template_str, pivot_ifo, fixed_ifo, tags=None):
if tags is None:
tags = []
segs = trig_files.get_times_covered_by_files()
seg = segments.segment(segs[0][0], segs[-1][1])
node = Node(self)
node.add_input_opt('--template-bank', bank_file)
node.add_input_list_opt('--trigger-files', trig_files)
if len(stat_files) > 0:
node.add_input_list_opt('--statistic-files', stat_files)
if veto_file is not None:
for ifo in workflow.ifos:
make_analysis_dir('strain_files')
node = Executable(workflow.cp, 'strain_data_reduce', ifos=[ifo],
out_dir='strain_files').create_node()
node.add_opt('--gps-start-time', workflow.analysis_time[0])
node.add_opt('--gps-end-time', workflow.analysis_time[1])
if injection_file:
node.add_input_opt('--injection-file', injection_file)
fil = node.new_output_file_opt(workflow.analysis_time, '.hdf',
'--output-file', tags=tags)
stores.append(fil)
workflow += node
# Generate trigger input file
node = Executable(workflow.cp, 'rerank_trigger_input', ifos=workflow.ifos,
out_dir=out_dir, tags=tags).create_node()
node.add_input_opt('--statmap-file', statmap_file)
node.add_input_opt('--bank-file', bank_file)
trigfil = node.new_output_file_opt(workflow.analysis_time, '.hdf',
'--output-file', tags=tags)
workflow += node
# Parallelize coinc trigger followup
factor = int(workflow.cp.get_opt_tags("workflow-rerank",
"parallelization-factor", tags))
exe = Executable(workflow.cp, 'coinc_followup', ifos=workflow.ifos,
out_dir=out_dir, tags=tags)
stat_files = FileList([])
for i in range(factor):
node = exe.create_node()
tags = [] if not tags else tags
exe = CalcPSDExecutable(workflow.cp, 'calculate_psd',
ifos=segment_file.ifo, out_dir=out_dir,
tags=tags)
node = exe.create_node()
node.add_input_opt('--analysis-segment-file', segment_file)
node.add_opt('--segment-name', segment_name)
if frame_files and not exe.has_opt('frame-type'):
node.add_input_list_opt('--frame-files', frame_files)
node.new_output_file_opt(workflow.analysis_time, '.hdf', '--output-file')
workflow += node
return node.output_files[0]
class AvgPSDExecutable(Executable):
current_retention_level = Executable.FINAL_RESULT
def make_average_psd(workflow, psd_files, out_dir, tags=None,
output_fmt='.txt'):
make_analysis_dir(out_dir)
tags = [] if tags is None else tags
node = AvgPSDExecutable(workflow.cp, 'average_psd', ifos=workflow.ifos,
out_dir=out_dir, tags=tags).create_node()
node.add_input_list_opt('--input-files', psd_files)
if len(workflow.ifos) > 1:
node.new_output_file_opt(workflow.analysis_time, output_fmt,
'--detector-avg-file')
node.new_multiifo_output_list_opt('--time-avg-file', workflow.ifos,
workflow.analysis_time, output_fmt, tags=tags)
exe = Executable(workflow.cp, 'coinc_followup', ifos=workflow.ifos,
out_dir=out_dir, tags=tags)
stat_files = FileList([])
for i in range(factor):
node = exe.create_node()
node.new_output_file_opt(workflow.analysis_time, '.hdf',
'--output-file', tags=[str(i)])
node.add_multiifo_input_list_opt('--hdf-store', stores)
node.add_input_opt('--input-file', trigfil)
node.add_opt('--start-index', str(i))
node.add_opt('--stride', factor)
workflow += node
stat_files += node.output_files
exe = Executable(workflow.cp, 'rerank_coincs', ifos=workflow.ifos,
out_dir=out_dir, tags=tags)
node = exe.create_node()
node.add_input_list_opt('--stat-files', stat_files)
node.add_input_opt('--statmap-file', statmap_file)
node.add_input_opt('--followup-file', trigfil)
if ranking_file:
node.add_input_opt('--ranking-file', ranking_file)
node.new_output_file_opt(workflow.analysis_time, '.hdf',
'--output-file')
workflow += node
return node.output_files[0]
tags=None):
if tags is None:
tags = []
Executable.__init__(self, cp, exe_name, universe, ifo, out_dir,
tags=tags)
self.set_num_cpus(4)
self.set_memory('8000')
def create_node(self, job_segment, likelihood_file, horizon_dist_file):
node = Node(self)
node.add_input_opt('--likelihood-file', likelihood_file)
node.add_input_opt('--horizon-dist-file', horizon_dist_file)
node.new_output_file_opt(job_segment, '.xml.gz', '--output-file',
tags=self.tags, store_file=self.retain_files)
return node
class PycbcCalculateLikelihoodExecutable(Executable):
"""
The class responsible for running the pycbc_calculate_likelihood
executable which is part 4 of 4 of the gstlal_inspiral_calc_likelihood
functionality
"""
current_retention_level = Executable.FINAL_RESULT
def __init__(self, cp, exe_name, universe=None, ifo=None, out_dir=None,
tags=None):
if tags is None:
tags = []
Executable.__init__(self, cp, exe_name, universe, ifo, out_dir,
tags=tags)
def create_node(self, job_segment, trigger_file, likelihood_file,
horizon_dist_file):
node = Node(self)
# Need to follow through the split job tag if present
""" This class is used to create nodes for the ligolw_combine_segments
Executable
"""
# Always want to keep the segments
current_retention_level = Executable.FINAL_RESULT
def create_node(self, valid_seg, veto_files, segment_name):
node = Node(self)
node.add_opt('--segment-name', segment_name)
for fil in veto_files:
node.add_input_arg(fil)
node.new_output_file_opt(valid_seg, '.xml', '--output',
store_file=self.retain_files)
return node
class LigolwAddExecutable(Executable):
""" The class used to create nodes for the ligolw_add Executable. """
current_retention_level = Executable.INTERMEDIATE_PRODUCT
def __init__(self, *args, **kwargs):
super(LigolwAddExecutable, self).__init__(*args, **kwargs)
self.set_memory(2000)
def create_node(self, jobSegment, input_files, output=None,
use_tmp_subdirs=True, tags=None):
if tags is None:
tags = []
node = Node(self)
# Very few options to ligolw_add, all input files are given as a long
# argument list. If this becomes unwieldy we could dump all these files
# to a cache file and read that in. ALL INPUT FILES MUST BE LISTED AS
extra_tags.append(tag)
node = Node(self)
node.add_input_opt('--input', input_file)
node.new_output_file_opt(job_segment, '.sqlite', '--output',
tags=self.tags+extra_tags, store_file=self.retain_files)
return node
class PycbcCalculateFarExecutable(SQLInOutExecutable):
"""
The class responsible for making jobs for the FAR calculation code. This
only raises the default retention level
"""
current_retention_level=Executable.FINAL_RESULT
class ExtractToXMLExecutable(Executable):
"""
This class is responsible for running ligolw_sqlite jobs that will take an
SQL file and dump it back to XML.
"""
current_retention_level = Executable.INTERMEDIATE_PRODUCT
def __init__(self, cp, exe_name, universe=None, ifo=None, out_dir=None,
tags=None):
if tags is None:
tags = []
Executable.__init__(self, cp, exe_name, universe, ifo, out_dir,
tags=tags)
def create_node(self, job_segment, input_file):
node = Node(self)
node.add_input_opt('--database', input_file)
node.new_output_file_opt(job_segment, '.xml', '--extract',
tags=self.tags, store_file=self.retain_files)
The directory in which output files will be stored.
tags : list of strings (optional, default = [])
A list of the tagging strings that will be used for all jobs created
by this call to the workflow. An example might be ['full_data'].
This will be used in output names and directories.
Returns
--------
plot_files : ahope.FileList
A list of the output files from this stage.
"""
plot_files = FileList([])
# create executable
plotnumtemplates_job = Executable(workflow.cp, 'plotnumtemplates',
'vanilla', workflow.ifos, output_dir, tags)
for tag in tags:
# create node
node = Node(plotnumtemplates_job)
node.add_opt('--gps-start-time', workflow.analysis_time[0])
node.add_opt('--gps-end-time', workflow.analysis_time[1])
node.add_opt('--cache-file', cache_filename)
node.add_opt('--ifo-times', node.executable.ifo_string)
node.add_opt('--user-tag', tag.upper()+'_SUMMARY_PLOTS')
node.add_opt('--output-path', output_dir)
node.add_opt('--bank-pattern', tmpltbank_cachepattern)
node.add_opt('--enable-output')
# add node to workflow
workflow.add_node(node)
if tags is None:
tags = []
segs = coinc_files.get_times_covered_by_files()
seg = segments.segment(segs[0][0], segs[-1][1])
node = Node(self)
node.set_memory(5000)
node.add_input_list_opt('--coinc-files', coinc_files)
node.add_opt('--ifos', ifos)
node.new_output_file_opt(seg, '.hdf', '--output-file', tags=tags)
return node
class PyCBCMultiifoStatMapInjExecutable(Executable):
"""Calculate FAP, IFAR, etc"""
current_retention_level = Executable.MERGED_TRIGGERS
def create_node(self, zerolag, full_data,
injfull, fullinj, ifos, tags=None):
if tags is None:
tags = []
segs = zerolag.get_times_covered_by_files()
seg = segments.segment(segs[0][0], segs[-1][1])
node = Node(self)
node.set_memory(5000)
node.add_input_list_opt('--zero-lag-coincs', zerolag)
if isinstance(full_data, list):
node.add_input_list_opt('--full-data-background', full_data)
else:
node.add_input_opt('--full-data-background', full_data)
# currently ignored.
curr_tags = bank.tags + [curr_tag] + tags
job_tag = bank.description + "_" + self.name.upper()
out_file = File(bank.ifo_list, job_tag, bank.segment,
extension=self.extension, directory=self.out_dir,
tags=curr_tags, store_file=self.retain_files)
out_files.append(out_file)
node.add_output_list_opt('--output-filenames', out_files)
return node
class PycbcSplitBankXmlExecutable(PycbcSplitBankExecutable):
""" Subclass resonsible for creating jobs for pycbc_splitbank. """
extension='.xml.gz'
class PycbcConditionStrainExecutable(Executable):
""" The class responsible for creating jobs for pycbc_condition_strain. """
current_retention_level = Executable.ALL_TRIGGERS
def __init__(self, cp, exe_name, ifo=None, out_dir=None, universe=None,
tags=None):
super(PycbcConditionStrainExecutable, self).__init__(cp, exe_name, universe,
ifo, out_dir, tags)
def create_node(self, input_files, tags=None):
if tags is None:
tags = []
node = Node(self)
start_time = self.cp.get("workflow", "start-time")
end_time = self.cp.get("workflow", "end-time")
node.add_opt('--gps-start-time', start_time)
node.add_opt('--gps-end-time', end_time)