How to use the coffea.processor function in coffea

To help you get started, we’ve selected a few coffea examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github CoffeaTeam / coffea / tests / test_dask.py View on Github external
def do_dask_cached(client, filelist, cachestrategy=None):
    from coffea.processor.test_items import NanoEventsProcessor

    exe_args = {
        'client': client,
        'nano': True,
        'cachestrategy': cachestrategy,
        'savemetrics': True,
    }
    hists, metrics = processor.run_uproot_job(
        filelist,
        'Events',
        processor_instance=NanoEventsProcessor(),
        executor=processor.dask_executor,
        executor_args=exe_args
    )

    assert( hists['cutflow']['ZJets_pt'] == 18 )
    assert( hists['cutflow']['ZJets_mass'] == 6 )
    assert( hists['cutflow']['Data_pt'] == 84 )
    assert( hists['cutflow']['Data_mass'] == 66 )
github CoffeaTeam / coffea / tests / test_parsl.py View on Github external
def do_parsl_job(filelist, flatten=False, compression=0, config=None):
    treename='Events'

    from coffea.processor.test_items import NanoTestProcessor
    proc = NanoTestProcessor()

    exe_args = {
        'flatten': flatten,
        'compression': compression,
        'config': config,
    }
    hists = processor.run_parsl_job(filelist,
                                    treename,
                                    processor_instance=proc,
                                    executor=processor.executor.parsl_executor,
                                    executor_args=exe_args)

    assert( hists['cutflow']['ZJets_pt'] == 18 )
    assert( hists['cutflow']['ZJets_mass'] == 6 )
    assert( hists['cutflow']['Data_pt'] == 84 )
    assert( hists['cutflow']['Data_mass'] == 66 )
github CoffeaTeam / coffea / coffea / processor / test_items / NanoTestProcessor.py View on Github external
def __init__(self, columns=[]):
        self._columns = columns
        dataset_axis = hist.Cat("dataset", "Primary dataset")
        mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
        pt_axis = hist.Bin("pt", r"$p_{T}$ [GeV]", 30000, 0.25, 300)

        self._accumulator = processor.dict_accumulator({
                                                       'mass': hist.Hist("Counts", dataset_axis, mass_axis),
                                                       'pt': hist.Hist("Counts", dataset_axis, pt_axis),
                                                       'cutflow': processor.defaultdict_accumulator(int),
                                                       })
github CoffeaTeam / coffea / tests / test_dask.py View on Github external
def do_dask_cached(client, filelist, cachestrategy=None):
    from coffea.processor.test_items import NanoEventsProcessor

    exe_args = {
        'client': client,
        'nano': True,
        'cachestrategy': cachestrategy,
        'savemetrics': True,
    }
    hists, metrics = processor.run_uproot_job(
        filelist,
        'Events',
        processor_instance=NanoEventsProcessor(),
        executor=processor.dask_executor,
        executor_args=exe_args
    )

    assert( hists['cutflow']['ZJets_pt'] == 18 )
    assert( hists['cutflow']['ZJets_mass'] == 6 )
    assert( hists['cutflow']['Data_pt'] == 84 )
    assert( hists['cutflow']['Data_mass'] == 66 )
github CoffeaTeam / coffea / coffea / processor / test_items / NanoTestProcessor.py View on Github external
from coffea import hist, processor
from coffea.analysis_objects import JaggedCandidateArray as CandArray
import numpy as np


class NanoTestProcessor(processor.ProcessorABC):
    def __init__(self, columns=[]):
        self._columns = columns
        dataset_axis = hist.Cat("dataset", "Primary dataset")
        mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
        pt_axis = hist.Bin("pt", r"$p_{T}$ [GeV]", 30000, 0.25, 300)

        self._accumulator = processor.dict_accumulator({
                                                       'mass': hist.Hist("Counts", dataset_axis, mass_axis),
                                                       'pt': hist.Hist("Counts", dataset_axis, pt_axis),
                                                       'cutflow': processor.defaultdict_accumulator(int),
                                                       })

    @property
    def columns(self):
        return self._columns
github CoffeaTeam / coffea / tests / test_preloaded.py View on Github external
def test_preloaded_nanoevents():
    columns = ['nMuon','Muon_pt','Muon_eta','Muon_phi','Muon_mass','Muon_charge', 'nJet', 'Jet_eta']
    p = NanoEventsProcessor(columns=columns)

    tree = uproot.open(os.path.abspath('tests/samples/nano_dy.root'))['Events']
    arrays = tree.arrays(columns, flatten=True, namedecode='ascii')
    df = processor.PreloadedDataFrame(tree.numentries, arrays)
    print(arrays)

    events = NanoEvents.from_arrays(arrays, metadata={'dataset': 'ZJets'})
    hists = p.process(events)

    print(hists)
    assert( hists['cutflow']['ZJets_pt'] == 18 )
    assert( hists['cutflow']['ZJets_mass'] == 6 )

    with pytest.raises(RuntimeError):
        print(events.Muon.matched_jet)
github CoffeaTeam / coffea / tests / test_dask.py View on Github external
def do_dask_job(client, filelist, compression=0):
    treename='Events'
    from coffea.processor.test_items import NanoTestProcessor
    proc = NanoTestProcessor()
    
    exe_args = {
        'client': client,
        'compression': compression,
    }
    hists = processor.run_uproot_job(filelist,
                                     treename,
                                     processor_instance=proc,
                                     executor=processor.dask_executor,
                                     executor_args=exe_args)
    
    assert( hists['cutflow']['ZJets_pt'] == 18 )
    assert( hists['cutflow']['ZJets_mass'] == 6 )
    assert( hists['cutflow']['Data_pt'] == 84 )
    assert( hists['cutflow']['Data_mass'] == 66 )
github CoffeaTeam / coffea / coffea / processor / test_items / NanoTestProcessor.py View on Github external
def __init__(self, columns=[]):
        self._columns = columns
        dataset_axis = hist.Cat("dataset", "Primary dataset")
        mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
        pt_axis = hist.Bin("pt", r"$p_{T}$ [GeV]", 30000, 0.25, 300)

        self._accumulator = processor.dict_accumulator({
                                                       'mass': hist.Hist("Counts", dataset_axis, mass_axis),
                                                       'pt': hist.Hist("Counts", dataset_axis, pt_axis),
                                                       'cutflow': processor.defaultdict_accumulator(int),
                                                       })
github CoffeaTeam / coffea / coffea / processor / test_items / NanoEventsProcessor.py View on Github external
def __init__(self, columns=[], canaries=[]):
        self._columns = columns
        self._canaries = canaries
        dataset_axis = hist.Cat("dataset", "Primary dataset")
        mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
        pt_axis = hist.Bin("pt", r"$p_{T}$ [GeV]", 30000, 0.25, 300)

        self._accumulator = processor.dict_accumulator(
            {
                'mass': hist.Hist("Counts", dataset_axis, mass_axis),
                'pt': hist.Hist("Counts", dataset_axis, pt_axis),
                'cutflow': processor.defaultdict_accumulator(int),
                'worker': processor.set_accumulator(),
            }
github FAST-HEP / fast-carpenter / fast_carpenter / backends / coffea.py View on Github external
Functions to run a job using Coffea
"""
import copy
from fast_carpenter.masked_tree import MaskedUprootTree
from collections import namedtuple
from coffea import processor
from coffea.processor import futures_executor, run_uproot_job


EventRanger = namedtuple("EventRanger", "start_entry stop_entry entries_in_block")
SingleChunk = namedtuple("SingleChunk", "tree config")
ChunkConfig = namedtuple("ChunkConfig", "dataset")
ConfigProxy = namedtuple("ConfigProxy", "name eventtype")


class stages_accumulator(processor.AccumulatorABC):
    def __init__(self, stages):
        self._zero = copy.deepcopy(stages)
        self._value = copy.deepcopy(stages)

    def identity(self):
        return stages_accumulator(self._zero)

    def __getitem__(self, idx):
        return self._value[idx]

    def add(self, other):
        for i, stage in enumerate(self._value):
            if not hasattr(stage, "merge"):
                continue
            stage.merge(other[i])