Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
b"TStreamerObjectAny": TStreamerObjectAny,
b"TStreamerObjectAnyPointer": TStreamerObjectAnyPointer,
b"TStreamerString": TStreamerString,
b"TStreamerSTL": TStreamerSTL,
b"TStreamerSTLString": TStreamerSTLString,
b"TStreamerArtificial": TStreamerArtificial,
b"TObjArray": TObjArray}
streamercontext = ROOTDirectory._FileContext(None, streamerclasses, Compression(fCompress))
streamerkey = TKey.read(source, Cursor(fSeekInfo), streamercontext)
streamerinfos = _readstreamers(streamerkey._source, streamerkey._cursor, streamercontext)
classes = _defineclasses(streamerinfos)
context = ROOTDirectory._FileContext(streamerinfos, classes, Compression(fCompress))
keycursor = Cursor(fBEGIN)
mykey = TKey.read(source, keycursor, context)
return ROOTDirectory.read(source, keycursor, context, mykey)
else:
cursor, context, mykey = args
# See https://root.cern/doc/master/classTDirectoryFile.html.
fVersion, fDatimeC, fDatimeM, fNbytesKeys, fNbytesName = cursor.fields(source, ROOTDirectory._format3)
if fVersion <= 1000:
fSeekDir, fSeekParent, fSeekKeys = cursor.fields(source, ROOTDirectory._format4_small)
else:
fSeekDir, fSeekParent, fSeekKeys = cursor.fields(source, ROOTDirectory._format4_big)
subcursor = Cursor(fSeekKeys)
headerkey = TKey.read(source, subcursor, context)
def copied(self, index=None, origin=None, refs=None):
if index is None:
index = self.index
if origin is None:
origin = self.origin
if refs is None:
refs = self.refs
return Cursor(index, origin, refs)
_method(uproot.interp.jagged.asjagged.empty).__doc__ = interp_fragments["see1"]
_method(uproot.interp.jagged.asjagged.compatible).__doc__ = interp_fragments["see1"]
_method(uproot.interp.jagged.asjagged.numitems).__doc__ = interp_fragments["see1"]
_method(uproot.interp.jagged.asjagged.source_numitems).__doc__ = interp_fragments["see1"]
_method(uproot.interp.jagged.asjagged.fromroot).__doc__ = interp_fragments["see1"]
_method(uproot.interp.jagged.asjagged.destination).__doc__ = interp_fragments["see1"]
_method(uproot.interp.jagged.asjagged.fill).__doc__ = interp_fragments["see1"]
_method(uproot.interp.jagged.asjagged.clip).__doc__ = interp_fragments["see1"]
_method(uproot.interp.jagged.asjagged.finalize).__doc__ = interp_fragments["see1"]
# TODO: add asdtype asarray asdouble32 asstlbitset asjagged astable asobj asgenobj asstring STLVector STLString
################################################################ uproot.source.cursor.Cursor
uproot.source.cursor.Cursor.__doc__ = \
u"""Maintain a position in a :py:class:`Source ` that updates as data are read.
**Attributes, properties, and methods:**
- **index** (*int*) the position.
- **origin** (*int*) "beginning of buffer" position, used in the **refs** key in :py:func:`uproot.rootio._readobjany `.
- **refs** (``None`` or ``dict``-like) manages cross-references in :py:func:`uproot.rootio._readobjany `.
- :py:meth:`copied ` return a copy of this :py:class:`Cursor ` with modifications.
- :py:meth:`skipped ` return a copy of this :py:class:`Cursor ` with the **index** moved forward.
- :py:meth:`skip ` move the **index** of this :py:class:`Cursor ` forward.
- :py:meth:`fields ` interpret bytes in the :py:class:`Source ` with given data types and skip the **index** past them.
- :py:meth:`field ` interpret bytes in the :py:class:`Source ` with a given data type and skip the **index** past it.
- :py:meth:`bytes ` return a range of bytes from the :py:class:`Source ` and skip the **index** past it.
- :py:meth:`array ` return a range of bytes from the :py:class:`Source ` as a typed Numpy array and skip the **index** past it.
- :py:meth:`string ` read a string from the :py:class:`Source `, interpreting the first 1 or 5 bytes as a size and skip the **index** past it.
- :py:meth:`cstring ` read a null-terminated string from the :py:class:`Source ` and skip the **index** past it.
streamercontext = ROOTDirectory._FileContext(source.path, None, None, streamerclasses, uproot.source.compressed.Compression(fCompress), tfile)
streamerkey = TKey.read(source, Cursor(fSeekInfo), streamercontext, None)
streamerinfos, streamerinfosmap, streamerrules = _readstreamers(streamerkey._source, streamerkey._cursor, streamercontext, None)
else:
streamerinfos, streamerinfosmap, streamerrules = [], {}, []
classes = dict(globals())
classes.update(builtin_classes)
classes = _defineclasses(streamerinfos, classes)
context = ROOTDirectory._FileContext(source.path, streamerinfos, streamerinfosmap, classes, uproot.source.compressed.Compression(fCompress), tfile)
context.source = source
keycursor = Cursor(fBEGIN)
mykey = TKey.read(source, keycursor, context, None)
return ROOTDirectory.read(source, Cursor(fBEGIN + fNbytesName), context, mykey)
except Exception:
source.dismiss()
raise
else:
try:
if len(options) > 0:
raise TypeError("unrecognized options: {0}".format(", ".join(options)))
cursor, context, mykey = args
# See https://root.cern/doc/master/classTDirectoryFile.html.
fVersion, fDatimeC, fDatimeM, fNbytesKeys, fNbytesName = cursor.fields(source, ROOTDirectory._format3)
if fVersion <= 1000:
fSeekDir, fSeekParent, fSeekKeys = cursor.fields(source, ROOTDirectory._format4_small)
import subprocess
import json
# Make sure c file is named allstreamers.c
subprocess.run("root -l -q dev/allstreamers.c", shell=True)
f = uproot.open("dev/allstreamers.root")
# Check with json
data = json.load(open("dev/streamerversions.json"))
for x in f._context.streamerinfos:
if data[x._fName.decode("ascii")] != x._fClassVersion:
print("Old {0} version = {1}. New {0} version = {2}".format(x._fName, data[x._fName.decode("ascii")], x._fClassVersion))
tkey = uproot.rootio.TKey.read(f._context.source, uproot.source.cursor.Cursor(f._context.tfile["_fSeekInfo"]), None, None)
start = f._context.tfile["_fSeekInfo"] + tkey._fKeylen
streamerlen = tkey._fObjlen
with open("dev/allstreamers.root", "rb") as binary_file:
binary_file.seek(start)
couple_bytes = binary_file.read(streamerlen)
streamers = "streamers = {0}".format(repr(couple_bytes))
lines = []
for line in open("uproot/write/streamers.py"):
if line.startswith("streamers"):
lines.append(streamers)
else:
lines.append(line)
with open("uproot/write/streamers.py", "w") as streamerfile:
else:
self.fSeekKey, self.fSeekPdir = cursor.fields(source, self._format2_big)
self.fClassName = cursor.string(source)
self.fName = cursor.string(source)
if self.fSeekPdir == 0:
assert source.data(cursor.index, cursor.index + 1)[0] == 0
cursor.skip(1) # Top TDirectory fName and fTitle...
self.fTitle = cursor.string(source)
if self.fSeekPdir == 0:
assert source.data(cursor.index, cursor.index + 1)[0] == 0
cursor.skip(1) # ...are prefixed *and* null-terminated! Both!
# object size != compressed size means it's compressed
if self.fObjlen != self.fNbytes - self.fKeylen:
self._source = CompressedSource(context.compression, source, Cursor(self.fSeekKey + self.fKeylen), self.fNbytes - self.fKeylen, self.fObjlen)
self._cursor = Cursor(0, origin=-self.fKeylen)
# otherwise, it's uncompressed
else:
self._source = source
self._cursor = Cursor(self.fSeekKey + self.fKeylen, origin=self.fSeekKey)
self._context = context