Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def do_dask_cached(client, filelist, cachestrategy=None):
from coffea.processor.test_items import NanoEventsProcessor
exe_args = {
'client': client,
'nano': True,
'cachestrategy': cachestrategy,
'savemetrics': True,
}
hists, metrics = processor.run_uproot_job(
filelist,
'Events',
processor_instance=NanoEventsProcessor(),
executor=processor.dask_executor,
executor_args=exe_args
)
assert( hists['cutflow']['ZJets_pt'] == 18 )
assert( hists['cutflow']['ZJets_mass'] == 6 )
assert( hists['cutflow']['Data_pt'] == 84 )
assert( hists['cutflow']['Data_mass'] == 66 )
def do_parsl_job(filelist, flatten=False, compression=0, config=None):
treename='Events'
from coffea.processor.test_items import NanoTestProcessor
proc = NanoTestProcessor()
exe_args = {
'flatten': flatten,
'compression': compression,
'config': config,
}
hists = processor.run_parsl_job(filelist,
treename,
processor_instance=proc,
executor=processor.executor.parsl_executor,
executor_args=exe_args)
assert( hists['cutflow']['ZJets_pt'] == 18 )
assert( hists['cutflow']['ZJets_mass'] == 6 )
assert( hists['cutflow']['Data_pt'] == 84 )
assert( hists['cutflow']['Data_mass'] == 66 )
def __init__(self, columns=[]):
self._columns = columns
dataset_axis = hist.Cat("dataset", "Primary dataset")
mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
pt_axis = hist.Bin("pt", r"$p_{T}$ [GeV]", 30000, 0.25, 300)
self._accumulator = processor.dict_accumulator({
'mass': hist.Hist("Counts", dataset_axis, mass_axis),
'pt': hist.Hist("Counts", dataset_axis, pt_axis),
'cutflow': processor.defaultdict_accumulator(int),
})
def do_dask_cached(client, filelist, cachestrategy=None):
from coffea.processor.test_items import NanoEventsProcessor
exe_args = {
'client': client,
'nano': True,
'cachestrategy': cachestrategy,
'savemetrics': True,
}
hists, metrics = processor.run_uproot_job(
filelist,
'Events',
processor_instance=NanoEventsProcessor(),
executor=processor.dask_executor,
executor_args=exe_args
)
assert( hists['cutflow']['ZJets_pt'] == 18 )
assert( hists['cutflow']['ZJets_mass'] == 6 )
assert( hists['cutflow']['Data_pt'] == 84 )
assert( hists['cutflow']['Data_mass'] == 66 )
from coffea import hist, processor
from coffea.analysis_objects import JaggedCandidateArray as CandArray
import numpy as np
class NanoTestProcessor(processor.ProcessorABC):
def __init__(self, columns=[]):
self._columns = columns
dataset_axis = hist.Cat("dataset", "Primary dataset")
mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
pt_axis = hist.Bin("pt", r"$p_{T}$ [GeV]", 30000, 0.25, 300)
self._accumulator = processor.dict_accumulator({
'mass': hist.Hist("Counts", dataset_axis, mass_axis),
'pt': hist.Hist("Counts", dataset_axis, pt_axis),
'cutflow': processor.defaultdict_accumulator(int),
})
@property
def columns(self):
return self._columns
def test_preloaded_nanoevents():
columns = ['nMuon','Muon_pt','Muon_eta','Muon_phi','Muon_mass','Muon_charge', 'nJet', 'Jet_eta']
p = NanoEventsProcessor(columns=columns)
tree = uproot.open(os.path.abspath('tests/samples/nano_dy.root'))['Events']
arrays = tree.arrays(columns, flatten=True, namedecode='ascii')
df = processor.PreloadedDataFrame(tree.numentries, arrays)
print(arrays)
events = NanoEvents.from_arrays(arrays, metadata={'dataset': 'ZJets'})
hists = p.process(events)
print(hists)
assert( hists['cutflow']['ZJets_pt'] == 18 )
assert( hists['cutflow']['ZJets_mass'] == 6 )
with pytest.raises(RuntimeError):
print(events.Muon.matched_jet)
def do_dask_job(client, filelist, compression=0):
treename='Events'
from coffea.processor.test_items import NanoTestProcessor
proc = NanoTestProcessor()
exe_args = {
'client': client,
'compression': compression,
}
hists = processor.run_uproot_job(filelist,
treename,
processor_instance=proc,
executor=processor.dask_executor,
executor_args=exe_args)
assert( hists['cutflow']['ZJets_pt'] == 18 )
assert( hists['cutflow']['ZJets_mass'] == 6 )
assert( hists['cutflow']['Data_pt'] == 84 )
assert( hists['cutflow']['Data_mass'] == 66 )
def __init__(self, columns=[]):
self._columns = columns
dataset_axis = hist.Cat("dataset", "Primary dataset")
mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
pt_axis = hist.Bin("pt", r"$p_{T}$ [GeV]", 30000, 0.25, 300)
self._accumulator = processor.dict_accumulator({
'mass': hist.Hist("Counts", dataset_axis, mass_axis),
'pt': hist.Hist("Counts", dataset_axis, pt_axis),
'cutflow': processor.defaultdict_accumulator(int),
})
def __init__(self, columns=[], canaries=[]):
self._columns = columns
self._canaries = canaries
dataset_axis = hist.Cat("dataset", "Primary dataset")
mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
pt_axis = hist.Bin("pt", r"$p_{T}$ [GeV]", 30000, 0.25, 300)
self._accumulator = processor.dict_accumulator(
{
'mass': hist.Hist("Counts", dataset_axis, mass_axis),
'pt': hist.Hist("Counts", dataset_axis, pt_axis),
'cutflow': processor.defaultdict_accumulator(int),
'worker': processor.set_accumulator(),
}
Functions to run a job using Coffea
"""
import copy
from fast_carpenter.masked_tree import MaskedUprootTree
from collections import namedtuple
from coffea import processor
from coffea.processor import futures_executor, run_uproot_job
EventRanger = namedtuple("EventRanger", "start_entry stop_entry entries_in_block")
SingleChunk = namedtuple("SingleChunk", "tree config")
ChunkConfig = namedtuple("ChunkConfig", "dataset")
ConfigProxy = namedtuple("ConfigProxy", "name eventtype")
class stages_accumulator(processor.AccumulatorABC):
def __init__(self, stages):
self._zero = copy.deepcopy(stages)
self._value = copy.deepcopy(stages)
def identity(self):
return stages_accumulator(self._zero)
def __getitem__(self, idx):
return self._value[idx]
def add(self, other):
for i, stage in enumerate(self._value):
if not hasattr(stage, "merge"):
continue
stage.merge(other[i])