Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_exceptions():
data = c.compress(b'1010101010100000000000000000000000000000000000000000000000000000000011111111111111111111111111111')
data = data[0:16]
with pytest.raises(Exception) as e:
c.decompress(data)
assert("decompressor wrote" in str(e.value).lower() or "corrupt input at" in str(e.value).lower() or "decompression failed: corrupt input" in str(e.value).lower())
data = c.compress(b'1010101010100000000000000000000000000000000000000000000000000000000011111111111111111111111111111')
data = [data[0:16] for x in (1, 2, 3)]
with pytest.raises(Exception) as e:
c.decompress_array(data)
assert ("decompressor wrote" in str(e.value).lower() or "corrupt input at" in str(e.value).lower() or "decompression failed: corrupt input" in str(e.value).lower())
def test_compress_empty_string():
assert(decompress(compress(b'')) == b'')
[c.compress, c.compressHC],
ids=('arctic', 'arcticHC'))
def test_roundtrip(compress):
_str = b"hello world"
cstr = compress(_str)
assert _str == c.decompress(cstr)
def test_roundtrip_multi(n):
_str = random_string(n)
cstr = c.compress(_str)
assert _str == c.decompress(cstr)
def bench_single(repeats, _strarr, use_HC):
# Arctic compress single
measurements = []
for i in range(repeats):
now = dt.now()
if use_HC:
res = [c.compressHC(x) for x in _strarr]
else:
res = [c.compress(x) for x in _strarr]
sample = (dt.now() - now).total_seconds()
assert all(res)
measurements.append(sample)
return measurements
# if one exists let's create the index on it
if idx_col is not None:
new_segments = np.array(new_segments, dtype='i8')
last_rows = recarr[new_segments - start]
# create numpy index
index = np.core.records.fromarrays([last_rows[idx_col]]
+ [new_segments, ],
dtype=INDEX_DTYPE)
# append to existing index if exists
if existing_index:
# existing_index_arr is read-only but it's never written to
existing_index_arr = np.frombuffer(decompress(existing_index), dtype=INDEX_DTYPE)
if start > 0:
existing_index_arr = existing_index_arr[existing_index_arr['index'] < start]
index = np.concatenate((existing_index_arr, index))
return Binary(compress(index.tostring()))
elif existing_index:
raise ArcticException("Could not find datetime64 index in item but existing data contains one")
return None