Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
res2 = ('x', NULL, 'y', 2, 'z', 10)
assert foo(10,y=2,z=10) == res2
assert foo.valid() == True
res1 = ('x', NULL, 'y', 1, 'z', 10)
assert foo(0,1,z=10) == res1
assert foo.valid() == True
assert foo.call() == 11
h = hashmap(algorithm='md5')
foo.register(h)
if hex(sys.hexversion) < '0x30300f0':
_hash1 = '2c8d801f4078eba873a5fb6909ab0f8d'
_hash2 = '949883b97d9fda9c8fe6bd468fe90af9'
else: # python 3.3 has hash randomization, apparently
from klepto.crypto import hash
_hash1 = hash(res1, 'md5')
_hash2 = hash(res2, 'md5')
assert foo(0,1,z=10) == _hash1
assert str(foo.keymap()) == str(h)
assert foo.key() == _hash1
assert foo(10,y=1,z=10) == _hash1
assert foo(10,y=2,z=10) == _hash2
assert foo.valid() == False
res2 = ('x', NULL, 'y', 2, 'z', 10)
assert foo(10,y=2,z=10) == res2
assert foo.valid() == True
res1 = ('x', NULL, 'y', 1, 'z', 10)
assert foo(0,1,z=10) == res1
assert foo.valid() == True
assert foo.call() == 11
h = hashmap(algorithm='md5')
foo.register(h)
if hex(sys.hexversion) < '0x30300f0':
_hash1 = '2c8d801f4078eba873a5fb6909ab0f8d'
_hash2 = '949883b97d9fda9c8fe6bd468fe90af9'
else: # python 3.3 has hash randomization, apparently
from klepto.crypto import hash
_hash1 = hash(res1, 'md5')
_hash2 = hash(res2, 'md5')
assert foo(0,1,z=10) == _hash1
assert str(foo.keymap()) == str(h)
assert foo.key() == _hash1
assert foo(10,y=1,z=10) == _hash1
assert foo(10,y=2,z=10) == _hash2
def encode(self, *args, **kwds):
"""use a flattened scheme for generating a key"""
return hash(keymap.encode(self, *args, **kwds), algorithm=self.__type__, **self._config)
def encrypt(self, *args, **kwds):
def __save__(self, memo=None, new=True):
"""create an archive from the given dictionary"""
if memo == None: return
filename = self.__state__['id']
_filename = os.path.join(os.path.dirname(os.path.abspath(filename)), TEMP+hash(random(), 'md5')) if new else filename
# create a temporary file, and dump the results
try:
f = hdf.File(_filename, 'w' if new else 'a')
for k,v in getattr(memo, 'iteritems', memo.items)():
#self._attrs(f).update({self._dumpkey(k): self._dumpval(v)})
_f = self._attrs(f)
_k = self._dumpkey(k)
_f.pop(_k,None)
_f[_k] = self._dumpval(v)
except OSError:
f = None
"failed to populate file for %s" % str(filename)
finally:
if f is not None: f.close()
if not new: return
# move the results to the proper place
def _store(self, key, value, input=False):
"store output (and possibly input) in a subdirectory"
_key = TEMP+hash(random(), 'md5')
# create an input file when key is not suitable directory name
if self._fname(key) != key: input=True #XXX: errors if protocol=0,1?
# create a temporary directory, and dump the results
try:
_file = os.path.join(self._mkdir(_key), self._file)
if input: _args = os.path.join(self._getdir(_key), self._args)
adict = {'serialized':self.__state__['serialized'],\
'protocol':self.__state__['protocol'],\
'meta':self.__state__['meta']}
#XXX: assumes one entry per file; ...could use name as key?
memo = hdf_archive(_file, **adict)
memo[None] = value
if input:
memo = hdf_archive(_args, **adict)
memo[None] = key
except (OSError,TypeError):
def _fname(self, key):
"generate suitable filename for a given key"
# special handling for pickles; enable non-strings (however 1=='1')
try: ispickle = key.startswith(PROTO) and key.endswith(STOP)
except: ispickle = False #FIXME: protocol 0,1 don't startwith(PROTO)
return hash(key, 'md5') if ispickle else str(key) #XXX: always hash?
#XXX: special handling in ispickle for protocol=json?
def _store(self, key, value, input=False):
"store output (and possibly input) in a subdirectory"
_key = TEMP+hash(random(), 'md5')
# create an input file when key is not suitable directory name
if self._fname(key) != key: input=True #XXX: errors if protocol=0,1?
# create a temporary directory, and dump the results
try:
_file = os.path.join(self._mkdir(_key), self._file)
if input: _args = os.path.join(self._getdir(_key), self._args)
if self.__state__['serialized']:
protocol = self.__state__['protocol']
if self.__state__['fast']:
protocol = None if type(protocol) is str else protocol
compression = self.__state__['compression']
_pickle.dump(value, _file, compress=compression,
protocol=protocol)
if input: _pickle.dump(key, _args, compress=compression,
protocol=protocol)
else:
def __save__(self, memo=None):
"""create an archive from the given dictionary"""
if memo == None: return
filename = self.__state__['id']
_filename = os.path.join(os.path.dirname(os.path.abspath(filename)), TEMP+hash(random(), 'md5'))
# create a temporary file, and dump the results
try:
if self.__state__['serialized']:
protocol = self.__state__['protocol']
if type(protocol) is str: #XXX: assumes 'json'
pik,mode,kwd = json,'w',{}
else: #XXX: byref=True ?
pik,mode,kwd = dill,'wb',{'protocol':protocol}
with open(_filename, mode) as f:
pik.dump(memo, f, **kwd)
else: #XXX: likely_import for each item in dict... ?
from .tools import _b
open(_filename, 'wb').write(_b('memo = %s' % repr(memo)))
except OSError:
"failed to populate file for %s" % str(filename)
# move the results to the proper place