Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
print("Hello!")
with open(self.get_output_file_name("some_file.txt"), "w") as f:
f.write("Done")
print("Bye!")
import sys
sys.stdout.flush()
os.kill(os.getpid(), 11)
with open(self.get_output_file_name("some_other_file.txt"), "w") as f:
f.write("Done")
if __name__ == "__main__":
b2luigi.set_setting("result_dir", "results")
b2luigi.process(MyTask())
class MyNumberTask(b2luigi.Task):
some_parameter = b2luigi.IntParameter()
def output(self):
yield self.add_to_output("output_file.txt")
def run(self):
random_number = random.random()
with open(self.get_output_file_name("output_file.txt"), "w") as f:
f.write(f"{random_number}\n")
if __name__ == "__main__":
b2luigi.set_setting("result_dir", "results")
b2luigi.process([MyNumberTask(some_parameter=i) for i in range(100)],
workers=200)
summed_numbers = 0
counter = 0
for input_file in self.get_input_file_names("output_file.txt"):
with open(input_file, "r") as f:
summed_numbers += float(f.read())
counter += 1
average = summed_numbers / counter
with open(self.get_output_file_name("average.txt"), "w") as f:
f.write(f"{average}\n")
if __name__ == "__main__":
b2luigi.set_setting("result_dir", "results")
b2luigi.process(MyAverageTask(), workers=200)
class MyNumberTask(b2luigi.Task):
some_parameter = b2luigi.IntParameter()
def output(self):
return b2luigi.LocalTarget(f"results/output_file_{self.some_parameter}.txt")
def run(self):
random_number = random.random()
with self.output().open("w") as f:
f.write(f"{random_number}\n")
if __name__ == "__main__":
b2luigi.set_setting("result_dir", "results")
b2luigi.process([MyNumberTask(some_parameter=i) for i in range(100)],
workers=200)
print("Hello!")
with open(self.get_output_file_name("some_file.txt"), "w") as f:
f.write("Done")
print("Bye!")
import sys
sys.stdout.flush()
os.kill(os.getpid(), 11)
with open(self.get_output_file_name("some_other_file.txt"), "w") as f:
f.write("Done")
if __name__ == "__main__":
b2luigi.set_setting("result_dir", "results")
b2luigi.process(MyTask())
def output(self):
yield self.add_to_output("D_n_tuple.root")
yield self.add_to_output("B_n_tuple.root")
class MasterTask(Basf2nTupleMergeTask):
n_events = luigi.IntParameter()
def requires(self):
for event_type in SimulationType:
yield self.clone(AnalysisTask, event_type=event_type)
if __name__ == "__main__":
luigi.process(MasterTask(n_events=1), workers=4)
"4S/r00000/mixed/mdst/sub00/mdst_000255_prod00009434_task10020000255.root")
# if you want to iterate over different cuts, just add more values to this list
mbc_lower_cuts = [5.15, 5.2]
for mbc_lower_cut in mbc_lower_cuts:
yield MyAnalysisTask(
mbc_lower_cut=mbc_lower_cut,
gbasf2_project_name_prefix="luigiExample",
gbasf2_input_dataset=input_dataset,
max_event=100,
)
if __name__ == '__main__':
main_task_instance = MasterTask()
n_gbasf2_tasks = len(list(main_task_instance.requires()))
b2luigi.process(main_task_instance, workers=n_gbasf2_tasks)