Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_node(self, parent, tags=None):
if tags is None:
tags = []
node = Node(self)
node.add_input_opt('--input-file', parent)
if parent.name.endswith("gz"):
ext = ".xml.gz"
else:
ext = ".xml"
out_files = FileList([])
for i in range(self.num_splits):
curr_tag = 'split%d' % i
curr_tags = parent.tags + [curr_tag]
job_tag = parent.description + "_" + self.name.upper()
out_file = File(parent.ifo_list, job_tag, parent.segment,
extension=ext, directory=self.out_dir,
tags=curr_tags, store_file=self.retain_files)
execute_now : boolean, optional
If true, jobs are executed immediately. If false, they are added to the
workflow to be run later.
Returns
--------
veto_def_file : pycbc.workflow.core.SegFile
The workflow File object corresponding to this DQ veto file.
"""
if tags is None:
tags = []
seg_valid_seg = segments.segment([start_time,end_time])
# FIXME: This job needs an internet connection and X509_USER_PROXY
# For internet connection, it may need a headnode (ie universe local)
# For X509_USER_PROXY, I don't know what pegasus is doing
node = Node(veto_gen_job)
node.add_opt('--veto-categories', str(category))
node.add_opt('--ifo-list', ifo)
node.add_opt('--gps-start-time', str(start_time))
node.add_opt('--gps-end-time', str(end_time))
if tags:
veto_xml_file_name = "%s-VETOTIME_CAT%d_%s-%d-%d.xml" \
%(ifo, category, '_'.join(tags), start_time,
end_time-start_time)
else:
veto_xml_file_name = "%s-VETOTIME_CAT%d-%d-%d.xml" \
%(ifo, category, start_time, end_time-start_time)
veto_xml_file_path = os.path.abspath(os.path.join(out_dir,
veto_xml_file_name))
curr_url = urlunparse(['file', 'localhost',
veto_xml_file_path, None, None, None])
if tags:
def create_node(self, job_segment, input_file):
node = Node(self)
node.add_input_arg(input_file)
node.new_output_file_opt(job_segment, '.xml.gz', '--output',
tags=self.tags, store_file=self.retain_files)
return node
Returns
--------
plot_files : ahope.FileList
A list of the output files from this stage.
"""
plot_files = FileList([])
# create executable
plotnumtemplates_job = Executable(workflow.cp, 'plotnumtemplates',
'vanilla', workflow.ifos, output_dir, tags)
for tag in tags:
# create node
node = Node(plotnumtemplates_job)
node.add_opt('--gps-start-time', workflow.analysis_time[0])
node.add_opt('--gps-end-time', workflow.analysis_time[1])
node.add_opt('--cache-file', cache_filename)
node.add_opt('--ifo-times', node.executable.ifo_string)
node.add_opt('--user-tag', tag.upper()+'_SUMMARY_PLOTS')
node.add_opt('--output-path', output_dir)
node.add_opt('--bank-pattern', tmpltbank_cachepattern)
node.add_opt('--enable-output')
# add node to workflow
workflow.add_node(node)
# make all input_files parents
#for f in input_files:
# dep = dax.Dependency(parent=f.node._dax_node, child=node._dax_node)
# workflow._adag.addDependency(dep)
def create_node(self, inj_coinc_file, inj_xml_file, veto_file, veto_name, tags=[]):
node = Node(self)
node.add_input_list_opt('--trigger-file', inj_coinc_file)
node.add_input_list_opt('--injection-file', inj_xml_file)
if veto_name is not None:
node.add_input_opt('--veto-file', veto_file)
node.add_opt('--segment-name', veto_name)
node.new_output_file_opt(inj_xml_file[0].segment, '.hdf', '--output-file',
tags=tags)
return node
def create_node(self, statmap_file, other_statmap_files, tags=None):
if tags is None:
tags = []
node = Node(self)
node.add_input_opt('--statmap-file', statmap_file)
node.add_input_list_opt('--other-statmap-files',
other_statmap_files)
node.new_output_file_opt(statmap_file.segment, '.hdf',
'--output-file', tags=None)
return node
def create_node(self, parent=None, c_file=None, open_box=False,
seg_plot=None, tuning_tags=None, exclusion_tags=None,
html_dir=None, tags=None):
if tags is None:
tags = []
node = Node(self)
node.add_opt('--grb-name', self.cp.get('workflow', 'trigger-name'))
node.add_opt('--start-time', self.cp.get('workflow', 'trigger-time'))
node.add_opt('--ra', self.cp.get('workflow', 'ra'))
node.add_opt('--dec', self.cp.get('workflow', 'dec'))
node.add_opt('--ifo-tag', self.ifos)
if tuning_tags is not None:
node.add_opt('--tuning-injections', ','.join(tuning_tags))
if exclusion_tags is not None:
node.add_opt('--exclusion-injections', ','.join(exclusion_tags))
if seg_plot is not None:
node.add_opt('--seg-plot', seg_plot.storage_path)
def create_node(self, parent, segment, tags=None):
if tags is None:
tags = []
if not parent:
raise ValueError("Must provide an input file.")
node = Node(self)
node.add_input_opt('--input-file', parent)
output_file = File(parent.ifo_list, self.exe_name,
segment, extension='.xml', store_file=True,
directory=self.out_dir, tags=tags)
node.add_output_opt('--output-file', output_file)
return node
def create_node(self, job_segment, input_file):
node = Node(self)
node.add_input_opt('--database', input_file)
node.new_output_file_opt(job_segment, '.xml', '--extract',
tags=self.tags, store_file=self.retain_files)
return node
def create_node(self, job_segment, trigger_files):
node = Node(self)
for file in trigger_files:
node.add_input_arg(file)
node.new_output_file_opt(job_segment, '.pickle', '--output-file',
tags=self.tags, store_file=self.retain_files)
return node