Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_size(self):
stream = StreamManager.GetStream(self.raw_witness)
reader = BinaryReader(stream)
w = Witness()
w.Deserialize(reader)
self.assertEqual(w.Size(), self.expected_size)
def test_Deserialization(self):
stream = StreamManager.GetStream(self.raw_verification_contract)
reader = BinaryReader(stream)
vct = VerificationContract()
vct.Deserialize(reader)
self.assertEqual(vct.Address, self.expected_address)
self.assertEqual(vct.IsStandard, self.expected_isStandard)
self.assertEqual(vct.ParameterList, self.expected_ParameterList)
self.assertEqual(vct.PublicKeyHash.ToString(),
self.expected_PublicKeyHash)
self.assertEqual(vct.Script, self.expected_Script)
self.assertEqual(vct.ScriptHash.ToString(), self.expected_ScriptHash)
self.assertEqual(vct.Size(), self.expected_size)
def setUp(self):
self.stream = StreamManager.GetStream(self.consensus_payload_raw)
reader = BinaryReader(self.stream)
self.cp = ConsensusPayload()
self.cp.Deserialize(reader)
account.SubtractFromBalance(asset.AssetId, Fixed8(1220000000))
self.assertEqual(account.BalanceFor(asset.AssetId), Fixed8(1220000000))
self.assertEqual(account.HasBalance(asset.AssetId), True)
sshare_hash = Blockchain.SystemShare().Hash
account.SubtractFromBalance(sshare_hash, Fixed8(800000000))
self.assertFalse(account.AllBalancesZeroOrLess())
acct_json = account.ToJson()
stream = StreamManager.GetStream()
writer = BinaryWriter(stream)
account.Serialize(writer)
out = stream.ToArray()
StreamManager.ReleaseStream(stream)
input = binascii.unhexlify(out)
newaccount = AccountState.DeserializeFromDB(input)
self.assertEqual(acct_json, newaccount.ToJson())
def test_tx_big_remark(self):
path = '%s/fixtures/bigtx.txt' % os.getcwd()
with open(path, 'rb') as f:
blockraw = f.read().strip()
unhex = binascii.unhexlify(blockraw)
mstream = StreamManager.GetStream(unhex)
reader = BinaryReader(mstream)
tx = Transaction.DeserializeFrom(reader)
self.assertEqual(tx.Hash.ToString(), self.giant_tx_hash)
def AsSerializableWithType(buffer, class_name):
"""
Args:
buffer (BytesIO/bytes): stream to deserialize `class_name` to.
class_name (str): a full path to the class to be deserialized into. e.g. 'neo.Core.Block.Block'
Returns:
object: if deserialization is successful.
None: if deserialization failed.
"""
module = '.'.join(class_name.split('.')[:-1])
klassname = class_name.split('.')[-1]
klass = getattr(importlib.import_module(module), klassname)
mstream = StreamManager.GetStream(buffer)
reader = BinaryReader(mstream)
try:
serializable = klass()
serializable.Deserialize(reader)
return serializable
except Exception as e:
logger.error("Could not deserialize: %s %s" % (e, class_name))
finally:
StreamManager.ReleaseStream(mstream)
return None
def OnAddHeader(self, header):
hHash = header.Hash.ToBytes()
if hHash not in self._header_index:
self._header_index.append(hHash)
with self._db.write_batch() as wb:
while header.Index - 2000 >= self._stored_header_count:
ms = StreamManager.GetStream()
w = BinaryWriter(ms)
headers_to_write = self._header_index[self._stored_header_count:self._stored_header_count + 2000]
w.Write2000256List(headers_to_write)
out = ms.ToArray()
StreamManager.ReleaseStream(ms)
wb.put(DBPrefix.IX_HeaderHashList + self._stored_header_count.to_bytes(4, 'little'), out)
self._stored_header_count += 2000
with self._db.write_batch() as wb:
if self._db.get(DBPrefix.DATA_Block + hHash) is None:
wb.put(DBPrefix.DATA_Block + hHash, bytes(8) + header.ToArray())
wb.put(DBPrefix.SYS_CurrentHeader, hHash + header.Index.to_bytes(4, 'little'))
def DeserializeFromBufer(buffer, offset=0):
"""
Deserialize object instance from the specified buffer.
Args:
buffer (bytes, bytearray, BytesIO): (Optional) data to create the stream from.
offset: UNUSED
Returns:
Transaction:
"""
mstream = StreamManager.GetStream(buffer)
reader = BinaryReader(mstream)
tx = Transaction.DeserializeFrom(reader)
StreamManager.ReleaseStream(mstream)
return tx
def FromJson(jsn, isMultiSig=True):
try:
parsed = json.loads(jsn)
if parsed['type'] == 'Neo.Core.ContractTransaction':
verifiable = ContractTransaction()
ms = StreamManager.GetStream(binascii.unhexlify(parsed['hex']))
r = BinaryReader(ms)
verifiable.DeserializeUnsigned(r)
StreamManager.ReleaseStream(ms)
context = ContractParametersContext(verifiable, isMultiSig=isMultiSig)
for key, value in parsed['items'].items():
if "0x" in key:
key = key[2:]
key = key.encode()
parameterbytes = []
for pt in value['parameters']:
if pt['type'] == 'Signature':
parameterbytes.append(0)
contract = Contract.Create(value['script'], parameterbytes, key)
context.ContextItems[key] = ContextItem(contract)
if 'signatures' in value:
context.ContextItems[key].Signatures = value['signatures']
def DeserializeFromDB(buffer):
"""
Deserialize full object.
Args:
buffer (bytes, bytearray, BytesIO): (Optional) data to create the stream from.
Returns:
ContractState:
"""
m = StreamManager.GetStream(buffer)
reader = BinaryReader(m)
c = ContractState()
c.Deserialize(reader)
StreamManager.ReleaseStream(m)
return c