Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def arrtolz4string_list(arr: np.ndarray) -> List[bytes]:
"""
Converts (multi-dimensional) array to list of lz4 compressed strings.
Args:
arr: Input array.
Returns:
lz4 compressed string.
"""
if isinstance(arr, list):
arr = np.array(arr)
if len(arr) == 0:
return [b""]
try:
str_lst = [compress(arr.tobytes())]
# catch Value error which is thrown in py3 lz4 version
except (OverflowError, ValueError, LZ4BlockError):
half_ix = len(arr) // 2
str_lst = arrtolz4string_list(arr[:half_ix]) + \
arrtolz4string_list(arr[half_ix:])
return str_lst
key.fNbytes += key.fObjlen
key.write(keycursor, context._sink)
cursor.write_data(context._sink, givenbytes)
elif algorithm == uproot.const.kLZ4:
algo = b"L4"
try:
import xxhash
except ImportError:
raise ImportError("Install xxhash package with:\n pip install xxhash\nor\n conda install -c conda-forge python-xxhash")
try:
import lz4.block
except ImportError:
raise ImportError("Install lz4 package with:\n pip install lz4\nor\n conda install -c anaconda lz4")
if level >= 4:
after_compressed = lz4.block.compress(givenbytes, compression=level, mode="high_compression", store_size=False)
else:
after_compressed = lz4.block.compress(givenbytes, store_size=False)
compressedbytes = len(after_compressed) + 8
checksum = xxhash.xxh64(after_compressed).digest()
if (compressedbytes + 9) < uncompressedbytes:
c1 = (compressedbytes >> 0) & 0xff
c2 = (compressedbytes >> 8) & 0xff
c3 = (compressedbytes >> 16) & 0xff
method = lz4.library_version_number() // (100 * 100)
cursor.write_fields(context._sink, _header, algo, method, c1, c2, c3, u1, u2, u3)
cursor.write_data(context._sink, checksum)
cursor.write_data(context._sink, after_compressed)
key.fObjlen = uncompressedbytes
key.fNbytes = compressedbytes + key.fKeylen + 9
key.write(keycursor, context._sink)
else:
def roundtrip(x, c_kwargs, d_kwargs, dictionary):
if dictionary:
if isinstance(dictionary, tuple):
d = x[dictionary[0]:dictionary[1]]
else:
d = dictionary
c_kwargs['dict'] = d
d_kwargs['dict'] = d
c = lz4.block.compress(x, **c_kwargs)
if c_kwargs['store_size']:
assert get_stored_size(c) == len(x)
else:
d_kwargs['uncompressed_size'] = len(x)
return lz4.block.decompress(c, **d_kwargs)
lz4_compressHC = lambda _str: lz4_compress(_str, mode='high_compression')
except ImportError as e:
def test_upload_lz4_accept_lz4(self):
self.call_api_with_compression(accept_encoding='lz4',
content_encoding='lz4',
decoding_fn=lz4.block.decompress,
encoding_fn=lz4.block.compress,
expected_encoding='lz4')
def extract_nso(path_in, path_out):
with open(path_in, 'rb') as f:
text_off = read_u32(f, 0x10)
text_loc = read_u32(f, 0x14)
text_size = read_u32(f, 0x18)
text_compressed_size = read_u32(f, 0x60)
test_data = b'hello'
lz4.block.decompress(lz4.block.compress(test_data))
print('Text offset: {}'.format(text_off))
print('Text compressed size: {}'.format(text_compressed_size))
print('Text uncompressed size: {}'.format(text_size))
compressed_patched_text = read_at(f, text_off, text_compressed_size)
print(hx(compressed_patched_text)[0:10])
text = lz4.block.decompress(compressed_patched_text, uncompressed_size=text_size)
decompressed_hash = read_at(f, 0xA0, 0x20)
calculated_hash = sha256(text)
print('Compressed size: {}'.format(text_compressed_size))
print('Decompressed hash: {}'.format(hx(decompressed_hash)))
print('Calculated hash: {}'.format(hx(calculated_hash)))
if decompressed_hash == calculated_hash:
def repack_nso(path_original, path_patch, path_out):
nso = b''
patched_text_hash = b''
# Read the original NSO
with open(path_original, 'rb') as f:
nso = bytearray(f.read())
# Read the patched text
with open(path_patch, 'rb') as f:
data = f.read()
patched_text_hash = sha256(data)
compressed_patched_text = lz4.block.compress(data, store_size=False)
text_off = up('
def compress(data):
return lz4.block.compress(data, store_size=False)
Serializes array using Numpy's native serialization functionality and
compresses utilizing lz4's high compression algorithm.
Numeric types are serialized to C format and should be relatively easily to reverse
engineer to other languages (https://docs.scipy.org/doc/numpy/neps/npy-format.html).
Non-numeric types are strigified and serialized to MessagePack (http://msgpack.org/index.html).
:param arr: Numpy array
:return: LZ4-compressed binary blob
"""
if arr.dtype == np.dtype('O'):
data = msgpack.dumps(list(map(str, arr)))
else:
with io.BytesIO() as f:
np.save(f, arr)
f.seek(0)
data = f.read()
blob = lz4.block.compress(data, mode='high_compression')
return blob