Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_unpickle():
data = {'foo': 'bar', 'command': b'\xfa', 'entries': [b'\xfb', b'\xfc']}
python2_cpickle = b'\x80\x02}q\x01(U\x03fooq\x02U\x03barq\x03U\x07commandq\x04U\x01\xfaU\x07entriesq\x05]q\x06(U\x01\xfbU\x01\xfceu.'
python2_pickle = b'\x80\x02}q\x00(U\x03fooq\x01U\x03barq\x02U\x07commandq\x03U\x01\xfaq\x04U\x07entriesq\x05]q\x06(U\x01\xfbq\x07U\x01\xfcq\x08eu.'
python3_pickle = b'\x80\x02}q\x00(X\x03\x00\x00\x00fooq\x01X\x03\x00\x00\x00barq\x02X\x07\x00\x00\x00commandq\x03c_codecs\nencode\nq\x04X\x02\x00\x00\x00\xc3\xbaq\x05X\x06\x00\x00\x00latin1q\x06\x86q\x07Rq\x08X\x07\x00\x00\x00entriesq\t]q\n(h\x04X\x02\x00\x00\x00\xc3\xbbq\x0bh\x06\x86q\x0cRq\rh\x04X\x02\x00\x00\x00\xc3\xbcq\x0eh\x06\x86q\x0fRq\x10eu.'
python2_cpickle_data = pickle.loads(python2_cpickle)
assert data == python2_cpickle_data, 'Failed to unpickle data pickled by python2 cPickle'
python2_pickle_data = pickle.loads(python2_pickle)
assert data == python2_pickle_data, 'Failed to unpickle data pickled by python2 pickle'
python3_pickle_data = pickle.loads(python3_pickle)
assert data == python3_pickle_data, 'Failed to unpickle data pickled by python3 pickle'
if kwargs:
cmd = (funcID, args, kwargs)
elif args and not kwargs:
cmd = (funcID, args)
else:
cmd = funcID
sync = kwargs.pop('sync', False)
if callback is not None:
sync = False
if sync:
asyncResult = AsyncResult()
callback = asyncResult.onResult
timeout = kwargs.pop('timeout', None)
applier(pickle.dumps(cmd), callback, _COMMAND_TYPE.REGULAR)
if sync:
res = asyncResult.event.wait(timeout)
if not res:
raise SyncObjException('Timeout')
if not asyncResult.error == 0:
raise SyncObjException(asyncResult.error)
return asyncResult.result
def __processParseMessage(self):
if len(self.__readBuffer) < 4:
return None
l = struct.unpack('i', self.__readBuffer[:4])[0]
if len(self.__readBuffer) - 4 < l:
return None
data = self.__readBuffer[4:4 + l]
try:
if self.encryptor:
data = self.encryptor.decrypt(data)
message = pickle.loads(zlib.decompress(data))
if self.recvRandKey:
randKey, message = message
assert randKey == self.recvRandKey
except:
self.disconnect()
return None
self.__readBuffer = self.__readBuffer[4 + l:]
return message
self.__logger.debug(self.__index_configs[index_name].get_storage_type())
# create the index
if self.__index_configs[index_name].get_storage_type() == 'ram':
index = self.__ram_storage.create_index(self.__index_configs[index_name].get_schema(),
indexname=index_name)
else:
index = self.__file_storage.create_index(self.__index_configs[index_name].get_schema(),
indexname=index_name)
self.__indices[index_name] = index
self.__logger.info('{0} has created'.format(index_name))
# save the index config
with open(os.path.join(self.__file_storage.folder, self.get_index_config_file(index_name)),
'wb') as f:
f.write(pickle.dumps(index_config))
# open the index writer
self.__open_writer(index_name)
except Exception as ex:
self.__logger.error('failed to create {0}: {1}'.format(index_name, ex))
finally:
self.__record_metrics(start_time, 'create_index')
return index
def __serialize(self, filename, raft_data):
with self.__lock:
try:
self.__logger.info('serializer has started')
with zipfile.ZipFile(filename, 'w', zipfile.ZIP_DEFLATED) as f:
# store the federation data
f.writestr('federation.bin', pickle.dumps(self.__data))
self.__logger.debug('federation data has stored in {0}'.format(filename))
# store the raft data
f.writestr(RAFT_DATA_FILE, pickle.dumps(raft_data))
self.__logger.info('{0} has restored'.format(RAFT_DATA_FILE))
self.__logger.info('snapshot has created')
except Exception as ex:
self.__logger.error('failed to create snapshot: {0}'.format(ex))
finally:
self.__logger.info('serializer has stopped')
def setCodeVersion(self, newVersion, callback = None):
"""Switch to a new code version on all cluster nodes. You
should ensure that cluster nodes are updated, otherwise they
won't be able to apply commands.
:param newVersion: new code version
:type int
:param callback: will be called on cussess or fail
:type callback: function(`FAIL_REASON <#pysyncobj.FAIL_REASON>`_, None)
"""
assert isinstance(newVersion, int)
if newVersion > self.__selfCodeVersion:
raise Exception('wrong version, current version is %d, requested version is %d' % (self.__selfCodeVersion, newVersion))
if newVersion < self.__enabledCodeVersion:
raise Exception('wrong version, enabled version is %d, requested version is %d' % (self.__enabledCodeVersion, newVersion))
self._applyCommand(pickle.dumps(newVersion), callback, _COMMAND_TYPE.VERSION)
try:
if self.is_index_exist(index_name):
# open the index
index = self.__open_index(index_name, schema=schema)
else:
# create the index
index = self.__file_storage.create_index(schema, indexname=index_name)
self.__indices[index_name] = index
self.__logger.info('{0} was created'.format(index_name))
# open the index writer
self.__open_writer(index_name)
# save the schema in the index dir
with open(os.path.join(self.__file_storage.folder, '{0}.schema'.format(index_name)), 'wb') as f:
f.write(pickle.dumps(schema))
except Exception as ex:
self.__logger.error('failed to create {0}: {1}'.format(index_name, ex))
return index
def loads(data):
data = to_bytes(data)
try:
return pickle.loads(data)
except:
if is_py3:
return pickle._loads(data)
raise
def loads(data):
data = to_bytes(data)
try:
return pickle.loads(data)
except:
if is_py3:
return pickle._loads(data)
raise