Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import b2luigi
import random
class MyNumberTask(b2luigi.Task):
some_parameter = b2luigi.Parameter()
def output(self):
yield self.add_to_output("output_file.txt")
def run(self):
random_number = random.random()
with open(self.get_output_file_name("output_file.txt"), "w") as f:
f.write(f"{random_number}\n")
class MyAverageTask(b2luigi.Task):
def requires(self):
for i in range(100):
yield self.clone(MyNumberTask, some_parameter=i)
def wrap_parameter():
"""
Monkey patch the parameter base class (and with it all other parameters(
of luigi to include an additional "hashed" parameter in its constructor.
Enabling this parameter will use a hashed version of the parameter value
when creating file paths our of the parameters of a task instead of the
value itself.
This is especially useful when you have list, string or dict parameters,
where the resulting file path may include "/" or "{}".
"""
import b2luigi
parameter_class = b2luigi.Parameter
def serialize_hashed(self, x):
return "hashed_" + hashlib.md5(str(x).encode()).hexdigest()
old_init = parameter_class.__init__
def __init__(self, hashed=False, *args, **kwargs):
old_init(self, *args, **kwargs)
if hashed:
self.serialize_hashed = lambda x: serialize_hashed(self, x)
parameter_class.__init__ = __init__
data_mode = b2luigi.EnumParameter(enum=DataMode)
experiment_number = b2luigi.IntParameter()
run_number = b2luigi.IntParameter()
prefix = b2luigi.Parameter()
file_name = b2luigi.Parameter()
class RawDataTask(DataTask):
data_mode = DataMode.raw
def output(self):
yield {"raw_output.root": b2luigi.LocalTarget(_build_data_path(self))}
class DstDataTask(DataTask):
release = b2luigi.Parameter()
prod = b2luigi.IntParameter()
database = b2luigi.IntParameter()
def output(self):
yield {"full_output.root": b2luigi.LocalTarget(_build_data_path(self))}
class SkimmedRawDataTask(DstDataTask):
data_mode = DataMode.skimmed_raw
def output(self):
yield {"raw_output.root": b2luigi.LocalTarget(_build_data_path(self))}
class MdstDataTask(DstDataTask):
data_mode = DataMode.mdst
from parse import parse
class DataMode(enum.Enum):
raw = "raw"
mdst = "mdst"
cdst = "cdst"
skimmed_raw = "skimmed_raw"
class DataTask(b2luigi.ExternalTask):
data_mode = b2luigi.EnumParameter(enum=DataMode)
experiment_number = b2luigi.IntParameter()
run_number = b2luigi.IntParameter()
prefix = b2luigi.Parameter()
file_name = b2luigi.Parameter()
class RawDataTask(DataTask):
data_mode = DataMode.raw
def output(self):
yield {"raw_output.root": b2luigi.LocalTarget(_build_data_path(self))}
class DstDataTask(DataTask):
release = b2luigi.Parameter()
prod = b2luigi.IntParameter()
database = b2luigi.IntParameter()
def output(self):
from os.path import join
import b2luigi
from b2luigi.basf2_helper.tasks import Basf2PathTask
import example_mdst_analysis
class MyAnalysisTask(Basf2PathTask):
# set the batch_system property to use the gbasf2 wrapper batch process for this task
batch_system = "gbasf2"
# Must define a prefix for the gbasf2 project name to submit to the grid.
# b2luigi will then add a hash derived from the luigi parameters to create a unique project name.
gbasf2_project_name_prefix = b2luigi.Parameter()
gbasf2_input_dataset = b2luigi.Parameter(hashed=True)
# Example luigi cut parameter to facilitate starting multiple projects for different cut values
mbc_lower_cut = b2luigi.IntParameter()
def create_path(self):
mbc_range = (self.mbc_lower_cut, 5.3)
return example_mdst_analysis.create_analysis_path(
d_ntuple_filename="D_ntuple.root",
b_ntuple_filename="B_ntuple.root",
mbc_range=mbc_range
)
def output(self):
yield self.add_to_output("D_ntuple.root")
yield self.add_to_output("B_ntuple.root")
from os.path import join
import b2luigi
from b2luigi.basf2_helper.tasks import Basf2PathTask
import example_mdst_analysis
class MyAnalysisTask(Basf2PathTask):
# set the batch_system property to use the gbasf2 wrapper batch process for this task
batch_system = "gbasf2"
# Must define a prefix for the gbasf2 project name to submit to the grid.
# b2luigi will then add a hash derived from the luigi parameters to create a unique project name.
gbasf2_project_name_prefix = b2luigi.Parameter()
gbasf2_input_dataset = b2luigi.Parameter(hashed=True)
# Example luigi cut parameter to facilitate starting multiple projects for different cut values
mbc_lower_cut = b2luigi.IntParameter()
def create_path(self):
mbc_range = (self.mbc_lower_cut, 5.3)
return example_mdst_analysis.create_analysis_path(
d_ntuple_filename="D_ntuple.root",
b_ntuple_filename="B_ntuple.root",
mbc_range=mbc_range
)
def output(self):
yield self.add_to_output("D_ntuple.root")
yield self.add_to_output("B_ntuple.root")
from parse import parse
class DataMode(enum.Enum):
raw = "raw"
mdst = "mdst"
cdst = "cdst"
skimmed_raw = "skimmed_raw"
class DataTask(b2luigi.ExternalTask):
data_mode = b2luigi.EnumParameter(enum=DataMode)
experiment_number = b2luigi.IntParameter()
run_number = b2luigi.IntParameter()
prefix = b2luigi.Parameter()
file_name = b2luigi.Parameter()
class RawDataTask(DataTask):
data_mode = DataMode.raw
def output(self):
yield {"raw_output.root": b2luigi.LocalTarget(_build_data_path(self))}
class DstDataTask(DataTask):
release = b2luigi.Parameter()
prod = b2luigi.IntParameter()
database = b2luigi.IntParameter()
def output(self):
yield {"full_output.root": b2luigi.LocalTarget(_build_data_path(self))}