Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pandas as pd
import pickle as pkl
import lz4.frame as lz4f
from coffea.util import numpy as np
from coffea.processor.spark.spark_executor import agg_histos_raw, reduce_histos_raw
from coffea.processor.test_items import NanoTestProcessor
proc = NanoTestProcessor()
one = proc.accumulator.identity()
two = proc.accumulator.identity()
hlist1 = [lz4f.compress(pkl.dumps(one))]
hlist2 = [lz4f.compress(pkl.dumps(one)),lz4f.compress(pkl.dumps(two))]
harray1 = np.array(hlist1, dtype='O')
harray2 = np.array(hlist2, dtype='O')
series1 = pd.Series(harray1)
series2 = pd.Series(harray2)
df = pd.DataFrame({'histos': harray2})
# correctness of these functions is checked in test_spark_executor
agg1 = agg_histos_raw(series1, proc, 1)
agg2 = agg_histos_raw(series2, proc, 1)
red = reduce_histos_raw(df, proc, 1)
encoding='ascii'
)
all_names = corrections[[columns[i] for i in range(4)]]
labels = np.unique(corrections[[columns[i] for i in range(4)]])
wrapped_up = {}
for label in labels:
etaMins = np.unique(corrections[np.where(all_names == label)][columns[4]])
etaMaxs = np.unique(corrections[np.where(all_names == label)][columns[5]])
etaBins = np.union1d(etaMins, etaMaxs).astype(np.double)
ptMins = np.unique(corrections[np.where(all_names == label)][columns[6]])
ptMaxs = np.unique(corrections[np.where(all_names == label)][columns[7]])
ptBins = np.union1d(ptMins, ptMaxs).astype(np.double)
discrMins = np.unique(corrections[np.where(all_names == label)][columns[8]])
discrMaxs = np.unique(corrections[np.where(all_names == label)][columns[9]])
discrBins = np.union1d(discrMins, discrMaxs).astype(np.double)
vals = np.zeros(shape=(len(discrBins) - 1, len(ptBins) - 1, len(etaBins) - 1),
dtype=corrections.dtype[10])
for i, eta_bin in enumerate(etaBins[:-1]):
for j, pt_bin in enumerate(ptBins[:-1]):
for k, discr_bin in enumerate(discrBins[:-1]):
this_bin = np.where((all_names == label) &
(corrections[columns[4]] == eta_bin) &
(corrections[columns[6]] == pt_bin) &
(corrections[columns[8]] == discr_bin))[0]
if len(this_bin) == 1:
vals[k, j, i] = corrections[this_bin][columns[10]][0]
elif len(this_bin) > 1:
raise Exception(
'Multiple formulas for the same bin: label={label} eta_bin={eta_bin} pt_bin={pt_bin} discr_bin={discr_bin}'.format(
label=label,
eta_bin=eta_bin,
def nan(self):
return np.isnan(self._hi)
raise Exception('Could not define dimension for {}'.format(whattype))
self._axes = deepcopy(dims)
self._feval_dim = None
vals_are_strings = ('string' in values.dtype.name or
'str' in values.dtype.name or
'unicode' in values.dtype.name or
'bytes' in values.dtype.name) # ....
if not isinstance(values, np.ndarray):
raise TypeError('values is not a numpy array, but %r' % type(values))
if not vals_are_strings:
raise Exception('Non-string values passed to dense_evaluated_lookup!')
if feval_dim is None:
raise Exception('Evaluation dimensions not specified in dense_evaluated_lookup')
funcs = np.zeros(shape=values.shape, dtype='O')
for i in range(values.size):
idx = np.unravel_index(i, shape=values.shape)
funcs[idx] = numbaize(values[idx], ['x'])
self._values = deepcopy(funcs)
# TODO: support for multidimensional functions and functions with variables other than 'x'
if len(feval_dim) > 1:
raise Exception('lookup_tools.evaluator only accepts 1D functions right now!')
self._feval_dim = feval_dim[0]
if not all(np.sort(self._bins) == self._bins):
raise ValueError("Binning not sorted!")
self._lo = self._bins[0]
self._hi = self._bins[-1]
# to make searchsorted differentiate inf from nan
self._bins = np.append(self._bins, np.inf)
self._interval_bins = np.r_[-np.inf, self._bins, np.nan]
self._bin_names = np.full(self._interval_bins[:-1].size, None)
elif isinstance(n_or_arr, numbers.Integral):
if lo is None or hi is None:
raise TypeError("Interpreting n_or_arr as uniform binning, please specify lo and hi values")
self._uniform = True
self._lo = lo
self._hi = hi
self._bins = n_or_arr
self._interval_bins = np.r_[-np.inf, np.linspace(self._lo, self._hi, self._bins + 1), np.inf, np.nan]
self._bin_names = np.full(self._interval_bins[:-1].size, None)
else:
raise TypeError("Cannot understand n_or_arr (nbins or binning array) type %r" % n_or_arr)
def __init__(self, size, storeIndividual=False):
self._weight = np.ones(size)
self._weights = {}
self._modifiers = {}
self._weightStats = {}
self._storeIndividual = storeIndividual
def _evaluate(self, *args):
""" uncertainties = f(args) """
bin_vals = {argname: args[self._dim_args[argname]] for argname in self._dim_order}
eval_vals = {argname: args[self._eval_args[argname]] for argname in self._eval_vars}
# lookup the bins that we care about
dim1_name = self._dim_order[0]
dim1_indices = np.clip(np.searchsorted(self._bins[dim1_name],
bin_vals[dim1_name],
side='right') - 1,
0, self._bins[dim1_name].size - 2)
# get clamp values and clip the inputs
outs = np.ones(shape=(args[0].size, 2), dtype=np.float)
for i in np.unique(dim1_indices):
mask = np.where(dim1_indices == i)
vals = np.clip(eval_vals[self._eval_vars[0]][mask],
self._eval_knots[0], self._eval_knots[-1])
outs[:, 0][mask] += self._eval_ups[i](vals)
outs[:, 1][mask] -= self._eval_downs[i](vals)
return outs
def add(self, name, selection):
"""Add a named mask
Parameters
----------
name : str
name of the mask
selection : numpy.ndarray
a flat array of dtype bool.
If not the first mask added, it must also have
the same shape as previously added masks.
"""
if isinstance(selection, np.ndarray) and selection.dtype == np.dtype('bool'):
if len(self._names) == 0:
self._mask = np.zeros(shape=selection.shape, dtype=self._dtype)
elif len(self._names) == 64:
raise RuntimeError("Exhausted all slots for %r, consider a larger dtype or fewer selections" % self._dtype)
elif self._mask.shape != selection.shape:
raise ValueError("New selection '%s' has different shape than existing ones (%r vs. %r)" % (name, selection.shape, self._mask.shape))
self._mask |= selection.astype(self._dtype) << len(self._names)
self._names.append(name)
else:
raise ValueError("PackedSelection only understands numpy boolean arrays, got %r" % selection)
def __init__(self, runs=None, lumis=None):
self.array = np.zeros(shape=(0, 2))
if runs is not None:
self.array = np.unique(np.c_[runs, lumis], axis=0)