Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_too_small_max_file_size_init(self):
with tempfile.TemporaryDirectory(prefix="tsinf_format_test") as tempdir:
# Fail immediately if the max_size is so small we can't even create a file
filename = os.path.join(tempdir, "samples.tmp")
self.assertRaises(
lmdb.MapFullError,
formats.SampleData,
path=filename,
sequence_length=1,
max_file_size=1,
)
def _write_to_lmdb(db, key, value):
"""
Write (key,value) to db
"""
success = False
while not success:
txn = db.begin(write=True)
try:
txn.put(key, value)
txn.commit()
success = True
except lmdb.MapFullError:
txn.abort()
# double the map_size
curr_limit = db.info()['map_size']
new_limit = curr_limit*2
db.set_mapsize(new_limit) # double it
def test_too_small_max_file_size_add(self):
with tempfile.TemporaryDirectory(prefix="tsinf_format_test") as tempdir:
base_size = 2 ** 16 # Big enough to allow the initial file to be created
# Fail during adding a large amount of data
with self.assertRaises(lmdb.MapFullError):
filename = os.path.join(tempdir, "samples.tmp")
with formats.SampleData(
path=filename, sequence_length=1, max_file_size=base_size
) as small_sample_file:
small_sample_file.add_site(
0,
alleles=["0", "1"],
genotypes=np.zeros(base_size, dtype=np.int8),
)
# Work around https://github.com/tskit-dev/tsinfer/issues/201
small_sample_file.data.store.close()
def write_datums(self, db, batch):
try:
with db.begin(write=True) as lmdb_txn:
for key, datum in batch:
lmdb_txn.put(key, datum)
except lmdb.MapFullError:
# double the map_size
curr_limit = db.info()['map_size']
new_limit = curr_limit * 2
try:
db.set_mapsize(new_limit) # double it
except AttributeError as e:
version = tuple(int(x) for x in lmdb.__version__.split('.'))
if version < (0, 87):
raise ValueError('py-lmdb is out of date (%s vs 0.87)' % lmdb.__version__)
else:
raise e
# try again
self.write_datums(db, batch)
def _write_batch_to_lmdb(db, batch):
"""
Write a batch of (key,value) to db
"""
try:
with db.begin(write=True) as lmdb_txn:
for key, datum in batch:
lmdb_txn.put(key, datum.SerializeToString())
except lmdb.MapFullError:
# double the map_size
curr_limit = db.info()['map_size']
new_limit = curr_limit * 2
try:
db.set_mapsize(new_limit) # double it
except AttributeError as e:
version = tuple(int(x) for x in lmdb.__version__.split('.'))
if version < (0, 87):
raise ImportError('py-lmdb is out of date (%s vs 0.87)' % lmdb.__version__)
else:
raise e
# try again
_write_batch_to_lmdb(db, batch)
def _write_batch_to_lmdb(db, batch):
"""
Write a batch of (key,value) to db
"""
try:
with db.begin(write=True) as lmdb_txn:
for key, datum in batch:
lmdb_txn.put(key, datum.SerializeToString())
except lmdb.MapFullError:
# double the map_size
curr_limit = db.info()['map_size']
new_limit = curr_limit * 2
try:
db.set_mapsize(new_limit) # double it
except AttributeError as e:
version = tuple(int(x) for x in lmdb.__version__.split('.'))
if version < (0, 87):
raise ImportError('py-lmdb is out of date (%s vs 0.87)' % lmdb.__version__)
else:
raise e
# try again
_write_batch_to_lmdb(db, batch)
# Log playback isn't compatible with generators
if not isinstance(kvpairs, list):
kvpairs = list(kvpairs)
realdb, dupsort = self.dbnames.get(db, (None, False))
try:
self.dirty = True
if not self.recovering:
self._logXactOper(self.putmulti, kvpairs, dupdata=dupdata, append=append, db=db)
with self.xact.cursor(db=realdb) as curs:
return curs.putmulti(kvpairs, dupdata=dupdata, append=append)
except lmdb.MapFullError:
return self._handle_mapfull()
if not isinstance(obj, np.ndarray):
obj = np.array(obj)
# Create msgpack
msg_pkgs[key] = msgpack.packb(obj, use_bin_type=True, default=encode_data)
# LMDB key: sample number as a string with trailing zeros
key = encode_str("{:010}".format(self.nb_samples))
# Construct final msgpack and store it in the LMDB
pkg = msgpack.packb(msg_pkgs, use_bin_type=True)
txn.put(key, pkg)
# Increase global sample counter
self.nb_samples += 1
except lmdb.MapFullError as e:
raise AttributeError(
"The LMDB `map_size` is too small: " "%s MB, %s" % (self.map_size_limit, e)
)
# Write the current number of samples to `meta_db` just in case
self.set_meta_str(NB_SAMPLES, self.nb_samples)
def put_or_grow(txn, key, value):
try:
txn.put(key, value)
return txn
except lmdb.MapFullError:
pass
txn.abort()
curr_size = db.info()['map_size']
new_size = curr_size * 2
logger.info("Doubling LMDB map_size to {:.2f}GB".format(new_size / 10**9))
db.set_mapsize(new_size)
txn = db.begin(write=True)
txn = put_or_grow(txn, key, value)
return txn
def _try_put(self):
"""Try to commit the buffers.
This is a trick to prevent ``1TB`` disk space required on ``NTFS`` partition.
Returns
-------
None
"""
for pair in self._buffer:
key, value = pair
try: self.txn.put(key, value)
except lmdb.MapFullError as e:
new_size = self.env.info()['map_size'] * 2
print('doubling LMDB map size to %d MB' % (new_size >> 20))
self.txn.abort()
self.env.set_mapsize(new_size)
self.txn = self.env.begin(write=True)
self._try_put()
self._cur_put = 0
self._buffer = []