Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _openfile(self, path, compression):
if isinstance(path, getattr(os, "PathLike", ())):
path = os.fspath(path)
elif hasattr(path, "__fspath__"):
path = path.__fspath__()
elif path.__class__.__module__ == "pathlib":
import pathlib
if isinstance(path, pathlib.Path):
path = str(path)
self.compression = compression
self._treedict = {}
self._sink = uproot.write.sink.file.FileSink(path)
self._path = path
self._filename = os.path.split(path)[1].encode("utf-8")
def __init__(self, fName, fNevBuf, fNevBufSize, fObjlen=0, fSeekKey=100, fSeekPdir=0, fBufferSize=0):
self.fClassName = b"TBasket"
self.fName = fName
self.fTitle = b"t"
self.fObjlen = fObjlen
self.fSeekKey = fSeekKey
self.fSeekPdir = fSeekPdir
self.fCycle = 0
self.fDatime = uproot.write.util.datime()
self.fNbytes = self.fObjlen + self.fKeylen
self.fBufferSize = fBufferSize
self.fNevBuf = fNevBuf
self.fNevBufSize = fNevBufSize
self._branch.fields["_fEntryNumber"] += len(items)
basketdata = numpy.array(items, dtype=self._branch.type, copy=False)
if basketdata.ndim != 1:
raise NotImplementedError("Multi dimensional array support is coming soon")
givenbytes = basketdata.tostring()
cursor = uproot.write.sink.cursor.Cursor(self._branch.file._fSeekFree)
self._branch.fields["_fBasketSeek"][self._branch.fields["_fWriteBasket"] - 1] = cursor.index
key = BasketKey(fName=self._branch.name,
fNevBuf=len(items),
fNevBufSize=numpy.dtype(self._branch.type).itemsize,
fSeekKey=copy(self._branch.file._fSeekFree),
fSeekPdir=copy(self._branch.file._fBEGIN),
fBufferSize=32000)
keycursor = uproot.write.sink.cursor.Cursor(key.fSeekKey)
key.write(cursor, self._branch.file._sink)
uproot.write.compress.write(self._branch.file, cursor, givenbytes, self._branch.compression, key, copy(keycursor))
self._branch.file._expandfile(cursor)
self._treelvl1._tree.fields["_fEntries"] = self._branch.fields["_fEntries"]
self._branch.fields["_fTotBytes"] += key.fObjlen + key.fKeylen
self._branch.fields["_fZipBytes"] += key.fNbytes
self._treelvl1._tree.fields["_fTotBytes"] = self._branch.fields["_fTotBytes"]
self._treelvl1._tree.fields["_fZipBytes"] = self._branch.fields["_fZipBytes"]
self._branch.fields["_fBasketBytes"][self._branch.fields["_fWriteBasket"] - 1] = key.fNbytes
self._treelvl1._tree.size_cursor.update_fields(self._branch.file._sink, self._tree_size, self._treelvl1._tree.fields["_fEntries"],
self._treelvl1._tree.fields["_fTotBytes"],
self._treelvl1._tree.fields["_fZipBytes"])
self._branch._writebasket_cursor.update_fields(self._branch.file._sink, self._branch._format_tbranch12,
self._branch.fields["_fWriteBasket"], self._branch.fields["_fEntryNumber"])
def write(self, cursor, sink):
self.cursor = uproot.write.sink.cursor.Cursor(cursor.index)
self.sink = sink
self.update()
cursor.skip(self._format1.size)
cursor.write_string(sink, self.fClassName)
cursor.write_string(sink, self.fName)
cursor.write_string(sink, self.fTitle)
def fKeylen(self):
return self._format1.size + uproot.write.sink.cursor.Cursor.length_strings([self.fClassName, self.fName, self.fTitle]) + self._format_basketkey.size + 1
def save_root(histogram: HistogramBase, path: str, name: Optional[str] = None):
"""Write histogram to a (new) ROOT file.
Parameters
----------
histogram : Any histogram
path: path for the output file (perhaps should not exist?)
name : The name of the histogram inside the file
"""
if name is None:
name = histogram.name
if os.path.isfile(path):
# TODO: Not supported currently
hfile = uproot.write.TFile.TFileUpdate(path)
else:
hfile = uproot.write.TFile.TFileCreate(path)
write_root(histogram, hfile, name)
hfile.close()
def _writerootdir(self):
cursor = uproot.write.sink.cursor.Cursor(self._fBEGIN)
self._rootdir = uproot.write.TDirectory.TDirectory(self, self._filename, self._fNbytesName)
key = uproot.write.TKey.TKey(b"TFile", self._filename, fObjlen=self._rootdir.size())
key.write(cursor, self._sink)
self._rootdir.write(cursor, self._sink)
self._expandfile(cursor)
def _expandfile(self, cursor):
if cursor.index > self._fSeekFree:
freecursor = uproot.write.sink.cursor.Cursor(cursor.index)
freekey = uproot.write.TKey.TKey(b"TFile", self._filename, fObjlen=0, fSeekKey=cursor.index, fSeekPdir=self._fBEGIN)
freeseg = uproot.write.TFree.TFree(cursor.index + freekey.fNbytes)
freekey.fObjlen = freeseg.size()
freekey.fNbytes += freekey.fObjlen
freekey.write(freecursor, self._sink)
freeseg.write(freecursor, self._sink)
self._fSeekFree = cursor.index
self._fEND = cursor.index + freekey.fNbytes
self._fNbytesFree = freekey.fNbytes
self._nfree = 1
self._endcursor.update_fields(self._sink, self._format_end, self._fEND, self._fSeekFree, self._fNbytesFree, self._nfree)
def __init__(self, tfile, fName, fNbytesName, fSeekDir=100, fSeekParent=0, fSeekKeys=0, allocationbytes=128, growfactor=8):
self.tfile = tfile
self.fName = fName
self.fNbytesName = fNbytesName
self.fNbytesKeys = self._format2.size
self.fSeekDir = fSeekDir
self.fSeekParent = fSeekParent
self.fSeekKeys = fSeekKeys
self.fDatimeC = uproot.write.util.datime()
self.fUUID = b'\x00\x01' + uuid.uuid1().bytes
self.allocationbytes = allocationbytes
self.growfactor = growfactor
self.headkey = uproot.write.TKey.TKey(fClassName = b"TFile",
fName = self.fName,
fObjlen = self._format2.size,
fSeekKey = self.fSeekKeys)
self.keys = collections.OrderedDict()
self.maxcycle = collections.Counter()
buff = (self._put_th2d(cursor, name)
+ self._put_tarray(cursor, self._fields["_fBinEntries"]) +
cursor.put_fields(self._format_tprofile, self._fields["_fErrorMode"], self._fields["_fZmin"],
self._fields["_fZmax"], self._fields["_fTsumwz"], self._fields["_fTsumwz2"]) +
self._put_tarray(cursor, self._fields["_fBinSumw2"]))
elif "TProfile3D" == self._fClassName.decode("utf-8"):
vers = 8
buff = (self._put_th3d(cursor, name)
+ self._put_tarray(cursor, self._fields["_fBinEntries"]) +
cursor.put_fields(self._format_tprofile, self._fields["_fErrorMode"], self._fields["_fTmin"],
self._fields["_fTmax"], self._fields["_fTsumwt"], self._fields["_fTsumwt2"]) +
self._put_tarray(cursor, self._fields["_fBinSumw2"]))
length = len(buff) + self._format_cntvers.size
cnt = numpy.int64(length - 4) | uproot.const.kByteCountMask
givenbytes = copy_cursor.put_fields(self._format_cntvers, cnt, vers) + buff
uproot.write.compress.write(context, write_cursor, givenbytes, compression, key, keycursor)