Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
MAX_FORMAT_VERSION = 255
MAX_CHUNKS = (2**63)-1
MAX_META_SIZE = (2**32-1) # uint32 max val
MIN_CLEVEL = 0
MAX_CLEVEL = 9
# lookup table for human readable sizes
SUFFIXES = OrderedDict((
("B", 2**0 ),
("K", 2**10),
("M", 2**20),
("G", 2**30),
("T", 2**40)))
# Codecs available from Blosc
CNAME_AVAIL = blosc.compressor_list()
CNAME_MAPPING = {
0: 'blosclz',
1: 'lz4',
2: 'snappy',
3: 'zlib',
4: 'zstd',
}
(DEFAULT_TYPESIZE, DEFAULT_CLEVEL, DEFAULT_SHUFFLE, DEFAULT_CNAME)))
# metadata args
METADATA_ARGS = ('magic_format', 'meta_checksum', 'meta_codec', 'meta_level', 'max_meta_size')
_METADATA_ARGS_SET = set(METADATA_ARGS) # cached
DEFAULT_MAGIC_FORMAT = 'JSON'
DEFAULT_META_CHECKSUM = 'adler32'
DEFAULT_META_CODEC = 'zlib'
DEFAULT_META_LEVEL = 6
DEFAULT_MAX_META_SIZE = lambda x: 10 * x
DEFAULT_METADATA_ARGS = dict(zip(METADATA_ARGS,
(DEFAULT_MAGIC_FORMAT, DEFAULT_META_CHECKSUM,
DEFAULT_META_CODEC, DEFAULT_META_LEVEL, DEFAULT_MAX_META_SIZE)))
# Codecs available from Blosc
CNAME_AVAIL = blosc.compressor_list()
# verbosity levels
NORMAL = 'NORMAL'
VERBOSE = 'VERBOSE'
DEBUG = 'DEBUG'
LEVEL = NORMAL
VERBOSITY_LEVELS = (NORMAL, VERBOSE, DEBUG)
# lookup table for human readable sizes
SUFFIXES = OrderedDict((
("B", 2**0 ),
("K", 2**10),
("M", 2**20),
("G", 2**30),
("T", 2**40)))
in_ = arrays[0][0]
# cause page faults here
out_ = np.full(in_.size, fill_value=0, dtype=in_.dtype)
t0 = time.time()
#out_ = np.copy(in_)
out_ = ctypes.memmove(out_.__array_interface__['data'][0],
in_.__array_interface__['data'][0], N*8)
tcpy = time.time() - t0
print(" *** ctypes.memmove() *** Time for memcpy():\t%.3f s\t(%.2f GB/s)" % (
tcpy, (N*8 / tcpy) / 2**30))
print("\nTimes for compressing/decompressing with clevel=%d and %d threads" % (
clevel, blosc.ncores))
for (in_, label) in arrays:
print("\n*** %s ***" % label)
for cname in blosc.compressor_list():
for filter in [blosc.NOSHUFFLE, blosc.SHUFFLE, blosc.BITSHUFFLE]:
t0 = time.time()
c = blosc.compress_ptr(in_.__array_interface__['data'][0],
in_.size, in_.dtype.itemsize,
clevel=clevel, shuffle=filter, cname=cname)
tc = time.time() - t0
# cause page faults here
out = np.full(in_.size, fill_value=0, dtype=in_.dtype)
t0 = time.time()
blosc.decompress_ptr(c, out.__array_interface__['data'][0])
td = time.time() - t0
assert((in_ == out).all())
print(" *** %-8s, %-10s *** %6.3f s (%.2f GB/s) / %5.3f s (%.2f GB/s)" % (
cname, blosc.filters[filter], tc, ((N*8 / tc) / 2**30), td, ((N*8 / td) / 2**30)), end='')
print("\tCompr. ratio: %5.1fx" % (N*8. / len(c)))
blosc.print_versions()
print("Creating a large NumPy array with 10**%d int64 elements:" % Nexp)
in_ = np.arange(N, dtype=np.int64) # the trivial linear distribution
#in_ = np.linspace(0, 100, N) # another linear distribution
#in_ = np.random.random_integers(0, 100, N) # random distribution
print(" ", in_)
tic = time.time()
out_ = np.copy(in_)
toc = time.time()
print(" Time for copying array with np.copy(): %.3f s" % (toc-tic,))
print()
for cname in blosc.compressor_list():
print("Using *** %s *** compressor::" % cname)
ctic = time.time()
c = blosc.pack_array(in_, clevel=clevel, shuffle=True, cname=cname)
ctoc = time.time()
dtic = time.time()
out = blosc.unpack_array(c)
dtoc = time.time()
assert((in_ == out).all())
print(" Time for pack_array/unpack_array: %.3f/%.3f s." % \
(ctoc-ctic, dtoc-dtic), end='')
print("\tCompr ratio: %.2f" % (in_.size*in_.dtype.itemsize*1. / len(c)))
ctic = time.time()
c = blosc.compress_ptr(in_.__array_interface__['data'][0],
in_.size, in_.dtype.itemsize,
clevel=clevel, shuffle=True, cname=cname)