Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_encoding(tmpdir):
japanese_doc = {"Test": u"こんにちは世界"}
path = str(tmpdir.join('test.db'))
# cp936 is used for japanese encodings
jap_storage = JSONStorage(path, encoding="cp936")
jap_storage.write(japanese_doc)
try:
exception = json.decoder.JSONDecodeError
except AttributeError:
exception = ValueError
with pytest.raises(exception):
# cp037 is used for english encodings
eng_storage = JSONStorage(path, encoding="cp037")
eng_storage.read()
jap_storage = JSONStorage(path, encoding="cp936")
assert japanese_doc == jap_storage.read()
def test_create_dirs():
temp_dir = tempfile.gettempdir()
while True:
dname = os.path.join(temp_dir, str(random.getrandbits(20)))
if not os.path.exists(dname):
db_dir = dname
db_file = os.path.join(db_dir, 'db.json')
break
with pytest.raises(IOError):
JSONStorage(db_file)
JSONStorage(db_file, create_dirs=True).close()
assert os.path.exists(db_file)
# Use create_dirs with already existing directory
JSONStorage(db_file, create_dirs=True).close()
assert os.path.exists(db_file)
os.remove(db_file)
os.rmdir(db_dir)
def db(self):
if self._db is None:
storage = CachingMiddleware(JSONStorage)
storage.WRITE_CACHE_SIZE = 50 # todo support for user specifying
self._db = TinyDB(self.source, storage=storage)
name = self.__class__.__name__
if "readonly" in name.lower():
# remove interface for inserting records making this a read only db
self._db.insert = None
else:
self.lock()
self._finish = weakref.finalize(self, self._close, self._db)
return self._db
from tinydb import TinyDB
from tinydb.storages import MemoryStorage, JSONStorage
class DB(TinyDB):
# To modify the default storage for all TinyDB instances
TinyDB.DEFAULT_STORAGE = MemoryStorage
pass
class MemStroage(MemoryStorage):
pass
class JsonStorage(JSONStorage):
pass
filepath is derived from the Period's name). Otherwise the data is
stored in memory.
Keyword args are passed to the TinyDB constructor. See the respective
docs for detailed information.
"""
super().__init__(name=name)
# evaluate args/kwargs for TinyDB constructor. This overwrites the
# 'storage' kwarg if explicitly passed
if data_dir is None:
args = []
kwargs["storage"] = storages.MemoryStorage
else:
args = [os.path.join(data_dir, "{}.json".format(self.name))]
kwargs["storage"] = storages.JSONStorage
self._db = TinyDB(*args, **kwargs)
self._create_category_cache()
def __init__(self):
"""Adds self to messages and event's `data` field.
Through this instance you can access TinyDB instance (data["tinydbproxy"].tinydb).
This plugin should be included first!
"""
super().__init__()
self.order = (-95, 95)
self.tinydb = TinyDB(path=self.get_path("tinydb_database.json"), storage=CachingMiddleware(JSONStorage))