Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
content_checksum, frame_type,
compression_level, auto_flush):
c_context = lz4frame.create_compression_context()
compressed = lz4frame.compress_begin(
c_context,
source_size=len(data),
compression_level=compression_level,
block_size=block_size,
content_checksum=content_checksum,
frame_type=frame_type,
auto_flush=auto_flush
)
compressed += lz4frame.compress_update(
c_context,
data)
compressed += lz4frame.compress_end(c_context)
d_context = lz4frame.create_decompression_context()
decompressed, bytes_read = lz4frame.decompress(d_context, compressed)
assert bytes_read == len(compressed)
assert decompressed == data
frame_type=frame_type,
auto_flush=auto_flush
)
data_in = get_chunked(data, c_chunks)
try:
while True:
compressed += lz4frame.compress_update(
c_context,
next(data_in)
)
except StopIteration:
pass
finally:
del data_in
compressed += lz4frame.compress_end(c_context)
d_context = lz4frame.create_decompression_context()
compressed_in = get_chunked(compressed, d_chunks)
decompressed = b''
bytes_read = 0
try:
while True:
d, b = lz4frame.decompress(d_context, next(compressed_in))
decompressed += d
bytes_read += b
except StopIteration:
pass
finally:
del compressed_in
#assert bytes_read == len(compressed)
content_checksum, frame_type,
compression_level, auto_flush):
c_context = lz4frame.create_compression_context()
compressed = lz4frame.compress_begin(
c_context,
source_size=len(data),
compression_level=compression_level,
block_size=block_size,
content_checksum=content_checksum,
frame_type=frame_type,
auto_flush=auto_flush
)
compressed += lz4frame.compress_update(
c_context,
data)
compressed += lz4frame.compress_end(c_context)
d_context = lz4frame.create_decompression_context()
decompressed, bytes_read = lz4frame.decompress(d_context, compressed)
assert bytes_read == len(compressed)
assert decompressed == data
del compressed
del decompressed
del c_context
del d_context
frame_type=frame_type,
auto_flush=auto_flush
)
data_in = get_chunked(data, c_chunks)
try:
while True:
compressed += lz4frame.compress_update(
c_context,
next(data_in)
)
except StopIteration:
pass
finally:
del data_in
compressed += lz4frame.compress_end(c_context)
d_context = lz4frame.create_decompression_context()
compressed_in = get_chunked(compressed, d_chunks)
decompressed = b''
bytes_read = 0
try:
while True:
d, b = lz4frame.decompress(d_context, next(compressed_in))
decompressed += d
bytes_read += b
except StopIteration:
pass
finally:
del compressed_in
#assert bytes_read == len(compressed)
ctx = lz4frame.create_compression_context()
self.__file_write(
lz4frame.compress_begin(
ctx,
block_size=lz4frame.BLOCKSIZE_MAX4MB, # makes no harm for larger blobs
block_mode=lz4frame.BLOCKMODE_LINKED,
compression_level=5,
content_checksum=lz4frame.CONTENTCHECKSUM_ENABLED,
# sorry, no per-block checksums yet
auto_flush=False,
source_size=self.__frlen,
)
)
for blob, meta in self.__frame:
self.__file_write(lz4frame.compress_update(ctx, blob))
self.__file_write(lz4frame.compress_end(ctx))
json.dump(
{
"type": "frame",
"file_off": file_off,
"file_size": self.__file_off - file_off,
"text_off": self.__text_off - self.__frlen,
"text_size": self.__frlen,
},
self.__meta,
sort_keys=True,
)
self.__meta.write("\n")
for blob, meta in self.__frame:
if meta is not None:
json.dump(meta, self.__meta, sort_keys=True)