Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __new__(null_archive, name=None, dict=None, cached=True, **kwds):
"""initialize a dictionary with a permanently-empty archive backend
Inputs:
name: (optional) identifier string [default: None]
dict: initial dictionary to seed the archive
cached: if True, use an in-memory cache interface to the archive
"""
if dict is None: dict = {}
archive = _null_archive()
archive.__state__['id'] = None if name is None else str(name)
if cached: archive = cache(archive=archive)
archive.update(dict)
return archive
def copy(self, name=None): #XXX: always None? or allow other settings?
"D.copy(name) -> a copy of D, with a new archive at the given name"
if name is None:
name = self.__state__['id']
return null_archive(__magic_key_0192837465__=name)
# interface
def archived(self, *on):
"""check if the cache is archived, or toggle archiving
If on is True, turn on the archive; if on is False, turn off the archive
"""
L = len(on)
if not L: return not isinstance(self.archive, null_archive)
if L > 1: raise TypeError("archived expected at most 1 argument, got %s" % str(L+1))
if bool(on[0]):
if not isinstance(self.__swap__, null_archive):
self.__swap__, self.__archive__ = self.__archive__, self.__swap__
elif isinstance(self.__archive__, null_archive):
raise ValueError("no valid archive has been set")
else:
if not isinstance(self.__archive__, null_archive):
self.__swap__, self.__archive__ = self.__archive__, self.__swap__
def sync(self, clear=False):
def __init__(self, *args, **kwds):
"""initialize a dictionary with an archive backend
Additional Inputs:
archive: instance of archive object
"""
self.__swap__ = null_archive()
self.__archive__ = kwds.pop('archive', null_archive())
dict.__init__(self, *args, **kwds)
#self.__state__ = {}
return
def __repr__(self):
def __init__(self, *args, **kwds):
"""initialize a dictionary with an archive backend
Additional Inputs:
archive: instance of archive object
"""
self.__swap__ = null_archive()
self.__archive__ = kwds.pop('archive', null_archive())
dict.__init__(self, *args, **kwds)
#self.__state__ = {}
return
def __repr__(self):
def drop(self): #XXX: sync first?
"set the current archive to NULL"
self.archived(True) #XXX: should not throw error if not archived?
self.archive = null_archive()
return
def open(self, archive):
archive = _dict_archive()
archive.__state__['id'] = None if name is None else str(name)
if cached: archive = cache(archive=archive)
archive.update(dict)
return archive
@classmethod
def from_frame(dict_archive, dataframe):
try:
dataframe = dataframe.copy()
dataframe.columns.name = dict_archive.__name__
except AttributeError: pass
return _from_frame(dataframe)
pass
class null_archive(_null_archive):
def __new__(null_archive, name=None, dict=None, cached=True, **kwds):
"""initialize a dictionary with a permanently-empty archive backend
Inputs:
name: (optional) identifier string [default: None]
dict: initial dictionary to seed the archive
cached: if True, use an in-memory cache interface to the archive
"""
if dict is None: dict = {}
archive = _null_archive()
archive.__state__['id'] = None if name is None else str(name)
if cached: archive = cache(archive=archive)
archive.update(dict)
return archive
@classmethod