Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dependencies(self):
class TaskA(b2luigi.Task):
some_parameter = b2luigi.IntParameter()
def output(self):
yield self.add_to_output("file_a")
@b2luigi.requires(TaskA)
class TaskB(b2luigi.Task):
def output(self):
yield self.add_to_output("file_b")
task = TaskB(some_parameter=42)
self.assertEqual(get_filled_params(task), {"some_parameter": 42})
self.assertEqual(len(task._get_input_targets("file_a")), 1)
self.assertEqual(len(task.get_input_file_names("file_a")), 1)
self.assertEqual(len(task.get_input_file_names().keys()), 1)
self.assertEqual(task._get_input_targets("file_a")[0].path, task.get_input_file_names("file_a")[0])
self.assertEqual(task._get_output_target("file_b").path, task.get_output_file_name("file_b"))
self.assertIn("file_b", task.get_output_file_name("file_b"))
self.assertIn("some_parameter=42", task.get_output_file_name("file_b"))
def test_requires(self):
class TaskA(b2luigi.Task):
some_parameter = b2luigi.IntParameter()
some_other_parameter = b2luigi.IntParameter()
def output(self):
yield self.add_to_output("test.txt")
@b2luigi.requires(TaskA, some_parameter=3)
class TaskB(b2luigi.Task):
another_parameter = b2luigi.IntParameter()
def output(self):
yield self.add_to_output("out.dat")
task = TaskB(some_other_parameter=1, another_parameter=42)
self.assertEqual(sorted(task.get_param_names()), ["another_parameter", "some_other_parameter"])
self.assertEqual(task.another_parameter, 42)
self.assertEqual(task.some_other_parameter, 1)
self.assertTrue(task.get_output_file_name("out.dat").endswith("results/some_other_parameter=1/another_parameter=42/out.dat"))
input_files = task.get_input_file_names("test.txt")
self.assertEqual(len(input_files), 1)
self.assertTrue(input_files[0].endswith("results/some_parameter=3/some_other_parameter=1/test.txt"))
def create_path(self):
path = basf2.create_path()
path.add_module('RootInput', inputFileNames=self.get_input_file_names("simulation_full_output.root"))
modularAnalysis.loadGearbox(path)
reconstruction.add_reconstruction(path)
modularAnalysis.outputMdst(self.get_output_file_name("reconstructed_output.root"), path=path)
return path
def output(self):
yield self.add_to_output("reconstructed_output.root")
@luigi.requires(ReconstructionTask)
class AnalysisTask(Basf2PathTask):
def create_path(self):
path = basf2.create_path()
modularAnalysis.inputMdstList('default', self.get_input_file_names("reconstructed_output.root"), path=path)
modularAnalysis.fillParticleLists([('K+', 'kaonID > 0.1'), ('pi+', 'pionID > 0.1')], path=path)
modularAnalysis.reconstructDecay('D0 -> K- pi+', '1.7 < M < 1.9', path=path)
modularAnalysis.matchMCTruth('D0', path=path)
modularAnalysis.reconstructDecay('B- -> D0 pi-', '5.2 < Mbc < 5.3', path=path)
try: # treeFit is the new function name in light releases after release 4 (e.g. light-2002-janus)
vertex.treeFit('B+', 0.1, update_all_daughters=True, path=path)
except AttributeError: # vertexTree is the function name in release 4
vertex.vertexTree('B+', 0.1, update_all_daughters=True, path=path)
modularAnalysis.matchMCTruth('B-', path=path)
modularAnalysis.variablesToNtuple('D0',
['M', 'p', 'E', 'useCMSFrame(p)', 'useCMSFrame(E)',
'daughter(0, kaonID)', 'daughter(1, pionID)', 'isSignal', 'mcErrors'],
else:
raise ValueError(f"Event type {self.event_type} is not valid. It should be either 'Y(4S)' or 'Continuum'!")
generators.add_evtgen_generator(path, 'signal', dec_file)
modularAnalysis.loadGearbox(path)
simulation.add_simulation(path)
path.add_module('RootOutput', outputFileName=self.get_output_file_name('simulation_full_output.root'))
return path
def output(self):
yield self.add_to_output("simulation_full_output.root")
@luigi.requires(SimulationTask)
class ReconstructionTask(Basf2PathTask):
def create_path(self):
path = basf2.create_path()
path.add_module('RootInput', inputFileNames=self.get_input_file_names("simulation_full_output.root"))
modularAnalysis.loadGearbox(path)
reconstruction.add_reconstruction(path)
modularAnalysis.outputMdst(self.get_output_file_name("reconstructed_output.root"), path=path)
return path
def output(self):
yield self.add_to_output("reconstructed_output.root")
class MdstDataTask(DstDataTask):
data_mode = DataMode.mdst
def output(self):
yield {"output.root": b2luigi.LocalTarget(_build_data_path(self))}
class CdstDataTask(DstDataTask):
data_mode = DataMode.cdst
def output(self):
yield {"output.root": b2luigi.LocalTarget(_build_data_path(self))}
requires_raw_data = b2luigi.requires(RawDataTask)
requires_skimmed_raw_data = b2luigi.requires(SkimmedRawDataTask)
requires_mdst_data = b2luigi.requires(MdstDataTask)
requires_cdst_data = b2luigi.requires(CdstDataTask)
def _get_dir_structure(data_mode):
if data_mode == DataMode.mdst:
return b2luigi.get_setting("mdst_dir_structure",
"/hsm/belle2/bdata/Data/release-{p.release}/DB{p.database:08d}/prod{p.prod:08d}/" + \
"e{p.experiment_number:04d}/4S/r{p.run_number:05d}/all/mdst/sub00/" + \
"mdst.{p.prefix}.{p.experiment_number:04d}.{p.run_number:05d}.{p.file_name}.root")
elif data_mode == DataMode.cdst:
return b2luigi.get_setting("cdst_dir_structure",
"/hsm/belle2/bdata/Data/release-{p.release}/DB{p.database:08d}/prod{p.prod:08d}/" + \
"e{p.experiment_number:04d}/4S/r{p.run_number:05d}/all/cdst/sub00/" + \
"cdst.{p.prefix}.{p.experiment_number:04d}.{p.run_number:05d}.{p.file_name}.root")
elif data_mode == DataMode.skimmed_raw:
class MdstDataTask(DstDataTask):
data_mode = DataMode.mdst
def output(self):
yield {"output.root": b2luigi.LocalTarget(_build_data_path(self))}
class CdstDataTask(DstDataTask):
data_mode = DataMode.cdst
def output(self):
yield {"output.root": b2luigi.LocalTarget(_build_data_path(self))}
requires_raw_data = b2luigi.requires(RawDataTask)
requires_skimmed_raw_data = b2luigi.requires(SkimmedRawDataTask)
requires_mdst_data = b2luigi.requires(MdstDataTask)
requires_cdst_data = b2luigi.requires(CdstDataTask)
def _get_dir_structure(data_mode):
if data_mode == DataMode.mdst:
return b2luigi.get_setting("mdst_dir_structure",
"/hsm/belle2/bdata/Data/release-{p.release}/DB{p.database:08d}/prod{p.prod:08d}/" + \
"e{p.experiment_number:04d}/4S/r{p.run_number:05d}/all/mdst/sub00/" + \
"mdst.{p.prefix}.{p.experiment_number:04d}.{p.run_number:05d}.{p.file_name}.root")
elif data_mode == DataMode.cdst:
return b2luigi.get_setting("cdst_dir_structure",
"/hsm/belle2/bdata/Data/release-{p.release}/DB{p.database:08d}/prod{p.prod:08d}/" + \
"e{p.experiment_number:04d}/4S/r{p.run_number:05d}/all/cdst/sub00/" + \
"cdst.{p.prefix}.{p.experiment_number:04d}.{p.run_number:05d}.{p.file_name}.root")