Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_lz4frame_open_write_read_text():
data = u'This is a test string'
with lz4frame.open('testfile', mode='wt') as fp:
fp.write(data)
with lz4frame.open('testfile', mode='rt') as fp:
data_out = fp.read()
assert data_out == data
if store_size is True:
kwargs['source_size'] = len(data)
kwargs['compression_level'] = compression_level
kwargs['block_size'] = block_size
kwargs['block_linked'] = block_linked
kwargs['content_checksum'] = content_checksum
kwargs['block_checksum'] = block_checksum
kwargs['auto_flush'] = auto_flush
kwargs['return_bytearray'] = return_bytearray
kwargs['mode'] = 'wb'
with lz4frame.open('testfile', **kwargs) as fp:
fp.write(data)
with lz4frame.open('testfile', mode='r') as fp:
data_out = fp.read()
assert data_out == data
def writeout_measurement(msm_jstr, fn, update, tid):
"""Safely write measurement to disk
"""
# Different processes might be trying to write the same file at the same
# time due to naming collisions. Use a safe tmpfile and atomic link
# NamedTemporaryFile creates files with permissions 600
# but we want other users (Nginx) to be able to read the measurement
suffix = ".{}.tmp".format(os.getpid())
with NamedTemporaryFile(suffix=suffix, dir=conf.msmtdir) as f:
with lz4frame.open(f, "w") as lzf:
lzf.write(msm_jstr)
# os.fsync(lzf.fileno())
final_fname = conf.msmtdir.joinpath(fn)
try:
os.chmod(f.name, 0o644)
os.link(f.name, final_fname)
metrics.incr("msmt_output_file_created")
except FileExistsError:
if update:
# update access time - used for cache cleanup
# no need to overwrite the file
os.utime(final_fname)
metrics.incr("msmt_output_file_updated")
else:
log.info(f"{tid} Refusing to overwrite {final_fname}")
@staticmethod
def compress(data): # pragma: no cover
return data
@staticmethod
def decompress(data): # pragma: no cover
return data
try:
import lz4.frame
try:
lz4.frame._compression.BUFFER_SIZE = BUFFER_SIZE
except AttributeError: # pragma: no cover
pass
lz4_open = functools.partial(lz4.frame.open, block_size=lz4.frame.BLOCKSIZE_MAX1MB)
lz4_compress = functools.partial(lz4.frame.compress, block_size=lz4.frame.BLOCKSIZE_MAX1MB)
lz4_compressobj = lz4.frame.LZ4FrameCompressor
lz4_decompress = lz4.frame.decompress
lz4_decompressobj = lz4.frame.LZ4FrameDecompressor
except ImportError: # pragma: no cover
lz4_open = None
lz4_compress, lz4_compressobj = None, None
lz4_decompress, lz4_decompressobj = None, None
gz_open = gzip.open
gz_compressobj = functools.partial(
lambda level=-1: zlib.compressobj(level, zlib.DEFLATED, 16 + zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
)
gz_decompressobj = functools.partial(lambda: zlib.decompressobj(16 + zlib.MAX_WBITS))
gz_compress = gzip.compress
gz_decompress = gzip.decompress
def load(filename):
'''Load a coffea file from disk
'''
with lz4.frame.open(filename) as fin:
output = cloudpickle.load(fin)
return output
break
log.debug("Loading nested %s", m.name)
k = tf.extractfile(m)
assert k is not None
if m.name.endswith(".json"):
for line in k:
yield (line, None)
elif m.name.endswith(".yaml"):
continue # FIXME
bucket_tstamp = "FIXME"
for msm in iter_yaml_msmt_normalized(k, bucket_tstamp):
yield (None, msm)
elif fn.endswith(".json.lz4"):
with lz4frame.open(fn) as f:
for line in f:
yield (line, None)
elif fn.endswith(".yaml.lz4"):
with lz4frame.open(fn) as f:
raise Exception("Unsupported format: YAML")
# bucket_tstamp = "FIXME"
# for msm in iter_yaml_msmt_normalized(f, bucket_tstamp):
# metrics.incr("yaml_normalization")
# yield (None, msm)
else:
raise RuntimeError(fn)
zfile = ZipFile(pp)
[subpath] = args # meh?
## oh god... https://stackoverflow.com/a/5639960/706389
ifile = zfile.open(subpath, mode='r')
ifile.readable = lambda: True # type: ignore
ifile.writable = lambda: False # type: ignore
ifile.seekable = lambda: False # type: ignore
ifile.read1 = ifile.read # type: ignore
# TODO pass all kwargs here??
# todo 'expected "BinaryIO"'??
return io.TextIOWrapper(ifile, encoding=encoding) # type: ignore[arg-type]
elif suf in {'.lz4'}:
import lz4.frame # type: ignore
return lz4.frame.open(str(pp), mode, *args, **kwargs)
elif suf in {'.zstd'}:
return _zstd_open(pp, mode, *args, **kwargs)
else:
return pp.open(mode, *args, **kwargs)
"""
if callable(obj):
objname = f"key={key} function={obj.__name__} hash={objhash}"
else:
objname = f"key={key} literal={type(obj)} hash={objhash}"
logger.info(f"EXECUTE {objname}")
ret = obj(*args, **kwargs) if callable(obj) else obj
if not skipcache:
fileext = ".pickle.lz4" if compress else ".pickle"
filepath = fs.path.join(CACHE_DIRNAME, objhash + fileext)
if not storage.isfile(filepath):
logger.info(f"STORE {objname}")
try:
with storage.open(filepath, "wb") as fid:
if compress:
with lz4.frame.open(fid, mode='wb') as _fid:
joblib.dump(ret, _fid, protocol=4)
else:
joblib.dump(ret, fid, protocol=4)
except Exception:
logger.exception("Could not dump object.")
else:
logger.warning(f"FILE_EXISTS {objname}")
return ret
def iter_yaml_lz4_reports(fn):
"""Iterate YAML reports from a lz4 file
"""
assert str(fn).endswith("lz4")
fd = lz4frame.open(fn)
blobgen = stream_yaml_blobs(fd)
off, header = next(blobgen)
headsha = hashlib.sha1(header)
# XXX: bad header kills whole bucket
header = yaml.load(header, Loader=CLoader)
if not header.get("report_id"):
header["report_id"] = generate_report_id(header)
for off, entry in blobgen:
entry_len = len(entry)
esha = headsha.copy()
esha.update(entry)
esha = esha.digest()
try:
entry = yaml.load(entry, Loader=CLoader)