Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_compression_content1(self):
for name, array in uproot.open("tests/samples/Zmumu-uncompressed.root")["events"].arrays(["Type", "Event", "E1", "px1", "Q1", "M"]).items():
array = array.tolist()
assert uproot.open("tests/samples/Zmumu-zlib.root")["events"].array(name).tolist() == array
assert uproot.open("tests/samples/Zmumu-lzma.root")["events"].array(name).tolist() == array
assert uproot.open("tests/samples/Zmumu-lz4.root")["events"].array(name).tolist() == array
assert uproot.open("tests/samples/Zmumu-zstd.root")["events"].array(name).tolist() == array
def test_selection_2_weights_data(config_2, filename):
selection = filters.build_selection("test_selection_1",
config_2, weights=["EventWeight"])
infile = uproot.open(filename)["events"]
mask = selection(infile, is_mc=False)
assert np.count_nonzero(mask) == 1486
header = selection.results_header()
assert len(header) == 2
assert len(header[0]) == 9
result = selection.results()
assert len(result) == 4
assert result[0][header[0].index("depth")] == 0
assert result[0][header[0].index("cut")] == "Any"
assert result[0][header[0].index("passed_incl")] == 1486
assert result[0][header[0].index("passed_incl") + 1] == 1486
def test_tree_arrays(self):
file = uproot.open("tests/samples/simple.root")
tree = file["tree"]
arrays = tree.arrays()
assert arrays[b"one"].tolist() == [1, 2, 3, 4]
assert arrays[b"two"].tolist() == numpy.array([1.1, 2.2, 3.3, 4.4], dtype=numpy.float32).tolist()
assert arrays[b"three"].tolist() == [b"uno", b"dos", b"tres", b"quatro"]
# get arrays again
arrays = tree.arrays()
assert arrays[b"one"].tolist() == [1, 2, 3, 4]
assert arrays[b"two"].tolist() == numpy.array([1.1, 2.2, 3.3, 4.4], dtype=numpy.float32).tolist()
assert arrays[b"three"].tolist() == [b"uno", b"dos", b"tres", b"quatro"]
# get tree again
tree = file["tree"]
arrays = tree.arrays()
def test_compression_identity(self):
assert uproot.open("tests/samples/Zmumu-zlib.root").compression.algoname == "zlib"
assert uproot.open("tests/samples/Zmumu-zlib.root").compression.level == 4
assert uproot.open("tests/samples/Zmumu-lzma.root").compression.algoname == "lzma"
assert uproot.open("tests/samples/Zmumu-lzma.root").compression.level == 4
assert uproot.open("tests/samples/Zmumu-lz4.root").compression.algoname == "lz4"
assert uproot.open("tests/samples/Zmumu-lz4.root").compression.level == 4
assert uproot.open("tests/samples/Zmumu-zstd.root").compression.algoname == "zstd"
assert uproot.open("tests/samples/Zmumu-zstd.root").compression.level == 5
assert uproot.open("tests/samples/Zmumu-uncompressed.root").compression.level == 0
assert uproot.open("tests/samples/HZZ-zlib.root").compression.algoname == "zlib"
assert uproot.open("tests/samples/HZZ-zlib.root").compression.level == 4
assert uproot.open("tests/samples/HZZ-lzma.root").compression.algoname == "lzma"
assert uproot.open("tests/samples/HZZ-lzma.root").compression.level == 4
assert uproot.open("tests/samples/HZZ-lz4.root").compression.algoname == "lz4"
assert uproot.open("tests/samples/HZZ-lz4.root").compression.level == 4
assert uproot.open("tests/samples/HZZ-zstd.root").compression.algoname == "zstd"
assert uproot.open("tests/samples/HZZ-zstd.root").compression.level == 5
assert uproot.open("tests/samples/HZZ-uncompressed.root").compression.level == 0
def test_double32(self):
t = uproot.open("tests/samples/demo-double32.root")["T"]
fD64 = t.array("fD64")
fF32 = t.array("fF32")
fI32 = t.array("fI32")
fI30 = t.array("fI30")
fI28 = t.array("fI28")
ratio_fF32 = fF32 / fD64
ratio_fI32 = fI32 / fD64
ratio_fI30 = fI30 / fD64
ratio_fI28 = fI28 / fD64
assert ratio_fF32.min() > 0.9999 and ratio_fF32.max() < 1.0001
assert ratio_fI32.min() > 0.9999 and ratio_fI32.max() < 1.0001
assert ratio_fI30.min() > 0.9999 and ratio_fI30.max() < 1.0001
assert ratio_fI28.min() > 0.9999 and ratio_fI28.max() < 1.0001
def get_tree(file_name, top_particle, particles):
"""Load a RapidSim tree."""
rapidsim_file = uproot.open(file_name)
tree = rapidsim_file['DecayTree']
return {particle: np.stack([1000.0*tree.array('{}_{}_TRUE'.format(particle, coord))
for coord in ('PX', 'PY', 'PZ', 'E')])
for particle in particles}
def test_issue55(self):
withoffsets = uproot.open(
"tests/samples/small-dy-withoffsets.root")["tree"]
nooffsets = uproot.open(
"tests/samples/small-dy-nooffsets.root")["tree"]
assert numpy.array_equal(withoffsets.array("nJet"),
nooffsets.array("nJet"))
assert numpy.array_equal(withoffsets.array("nMuon"),
nooffsets.array("nMuon"))
def equal(left, right):
if len(left) != len(right):
return False
for x, y in zip(left, right):
if not numpy.array_equal(x, y):
return False
return True
def test_issue76(self):
t = uproot.open("tests/samples/issue76.root")["Events"]
assert list(t.array("rootStrings")[0]) == [b"2", b"4"]
x, y = t.array("rootStrings")[0]
assert isinstance(x, uproot.rootio.TString)
def __init__(self, fin, branch='Events', selected_branches=None, \
exclude_branches=None, identifier=None, label=None, \
chunk_size=1000, nevts=-1, specs=None, nan=np.nan, histograms=False, \
redirector='root://cms-xrd-global.cern.ch', verbose=0):
self.type = self.__class__.__name__
self.fin = xfile(fin, redirector)
self.verbose = verbose
if self.verbose:
print("Reading {}".format(self.fin))
self.istream = uproot.open(self.fin)
self.branches = {}
self.gen = None
self.out_branches = []
self.identifier = identifier if identifier else ['run', 'event', 'luminosityBlock']
self.tree = self.istream[branch]
self.nrows = self.tree.numentries
self.nevts = nevts if nevts != -1 else self.nrows
self.label = label
self.idx = -1
self.chunk_idx = 0
self.chunk_size = chunk_size if chunk_size < self.nrows else self.nrows
self.nan = float(nan)
self.attrs = []
self.shape = None
self.cache = {}
self.hdict = {}
def basket_numentries(self, i):
b = uproot.open(self._treelvl1._file._path)[self._treelvl1.name][self.name]
return b.basket_numentries(i)