Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_roundtrip_multiframe_3(data):
nframes = 4
compressed = b''
ctx = lz4frame.create_compression_context()
for _ in range(nframes):
compressed += lz4frame.compress_begin(ctx)
compressed += lz4frame.compress_chunk(ctx, data)
compressed += lz4frame.compress_flush(ctx)
decompressed = b''
ctx = lz4frame.create_decompression_context()
for _ in range(nframes):
d, bytes_read, eof = lz4frame.decompress_chunk(ctx, compressed)
decompressed += d
assert eof == True
assert bytes_read == len(compressed) // nframes
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
compression_level, auto_flush):
c_context = lz4frame.create_compression_context()
compressed = lz4frame.compress_begin(
c_context,
source_size=len(data),
compression_level=compression_level,
block_size=block_size,
content_checksum=content_checksum,
frame_type=frame_type,
auto_flush=auto_flush
)
compressed += lz4frame.compress_update(
c_context,
data)
compressed += lz4frame.compress_end(c_context)
d_context = lz4frame.create_decompression_context()
decompressed, bytes_read = lz4frame.decompress(d_context, compressed)
assert bytes_read == len(compressed)
assert decompressed == data
del compressed
del decompressed
del c_context
del d_context
def test_create_decompression_context():
context = lz4frame.create_decompression_context()
assert context is not None
def test_decompress_chunk_return_type_4():
c = lz4frame.compress(b'', return_bytearray=False)
d = lz4frame.create_decompression_context()
r = lz4frame.decompress_chunk(
d,
c,
return_bytearray=True,
)
assert isinstance(r, tuple)
assert isinstance(r[0], bytearray)
assert isinstance(r[1], int)
assert isinstance(r[2], bool)
def test_reset_decompression_context_2():
if lz4.library_version_number() >= 10800:
c = lz4frame.compress(b'1234', return_bytearray=False)
context = lz4frame.create_decompression_context()
try:
# Simulate an error by passing junk to decompress
d = lz4frame.decompress_chunk(context, c[1:3])
except:
pass
r = lz4frame.reset_decompression_context(context)
assert r is None
# And confirm we can use the context after reset
d, bytes_read, eof = lz4frame.decompress_chunk(context, c)
assert d == b'1234'
assert bytes_read == len(c)
assert eof is True
else:
pass
def test_decompress_chunk_return_type_3():
c = lz4frame.compress(b'', return_bytearray=False)
d = lz4frame.create_decompression_context()
r = lz4frame.decompress_chunk(
d,
c,
return_bytearray=False,
)
assert isinstance(r, tuple)
assert isinstance(r[0], bytes)
assert isinstance(r[1], int)
assert isinstance(r[2], bool)
def test_create_decompression_context():
context = lz4frame.create_decompression_context()
assert context != None
compression_level, auto_flush):
c_context = lz4frame.create_compression_context()
compressed = lz4frame.compress_begin(
c_context,
source_size=len(data),
compression_level=compression_level,
block_size=block_size,
content_checksum=content_checksum,
frame_type=frame_type,
auto_flush=auto_flush
)
compressed += lz4frame.compress_update(
c_context,
data)
compressed += lz4frame.compress_end(c_context)
d_context = lz4frame.create_decompression_context()
decompressed, bytes_read = lz4frame.decompress(d_context, compressed)
assert bytes_read == len(compressed)
assert decompressed == data
finally:
del data_in
compressed += lz4frame.compress_flush(c_context)
get_frame_info_check(
compressed,
len(data),
store_size,
block_size,
block_linked,
content_checksum,
block_checksum,
)
d_context = lz4frame.create_decompression_context()
compressed_in = get_chunked(compressed, d_chunks)
decompressed = b''
bytes_read = 0
eofs = []
try:
while True:
d, b, e = lz4frame.decompress_chunk(
d_context,
next(compressed_in),
)
decompressed += d
bytes_read += b
eofs.append(e)
except StopIteration:
pass