Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_roundtrip_arr(n, length):
_strarr = [random_string(length) for _ in range(n)]
cstr = c.compress_array(_strarr)
assert _strarr == c.decompress_array(cstr)
def test_compress_array():
assert len(compress_array([b"foobar"*10])) > 0
assert isinstance(compress_array([b"foobar"*10]), list)
def test_decompress_array():
ll = [('foo%s' % i).encode('ascii') for i in range(100)]
assert decompress_array(compress_array(ll)) == ll
symbol_all_previous_shas.update(Binary(x['sha']) for x in
collection.find({'symbol': symbol}, projection={'sha': 1, '_id': 0}))
length = len(item)
if segment_offset > 0 and 'segment_index' in previous_version:
existing_index = previous_version['segment_index']
else:
existing_index = None
segment_index = []
# Compress
idxs = xrange(int(np.ceil(float(length) / rows_per_chunk)))
chunks = [(item[i * rows_per_chunk: (i + 1) * rows_per_chunk]).tostring() for i in idxs]
compressed_chunks = compress_array(chunks)
# Write
bulk = []
for i, chunk in zip(idxs, compressed_chunks):
segment = {
'data': Binary(chunk),
'compressed': True,
'segment': min((i + 1) * rows_per_chunk - 1, length - 1) + segment_offset,
}
segment_index.append(segment['segment'])
sha = checksum(symbol, segment)
segment_spec = {'symbol': symbol, 'sha': sha, 'segment': segment['segment']}
if ARCTIC_FORWARD_POINTERS_CFG is FwPointersCfg.DISABLED:
if sha not in symbol_all_previous_shas:
segment['sha'] = sha
def bench_multi(repeats, _strarr, use_HC, pool=None):
measurements = []
for j in range(repeats):
now = dt.now()
if pool:
# Raw LZ4 lib
if use_HC:
res = pool.map(c.lz4_compressHC, _strarr)
else:
res = pool.map(c.lz4_compress, _strarr)
else:
# Arctic's compression layer
if use_HC:
res = c.compressHC_array(_strarr)
else:
res = c.compress_array(_strarr, withHC=False)
sample = (dt.now() - now).total_seconds()
assert len(res) == len(_strarr)
assert all(res)
measurements.append(sample)
return measurements