Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_deploy_and_invoke(deploy_script, invoke_args, wallet, min_fee=DEFAULT_MIN_FEE):
bc = GetBlockchain()
sn = bc._db.snapshot()
accounts = DBCollection(bc._db, sn, DBPrefix.ST_Account, AccountState)
assets = DBCollection(bc._db, sn, DBPrefix.ST_Asset, AssetState)
validators = DBCollection(bc._db, sn, DBPrefix.ST_Validator, ValidatorState)
contracts = DBCollection(bc._db, sn, DBPrefix.ST_Contract, ContractState)
storages = DBCollection(bc._db, sn, DBPrefix.ST_Storage, StorageItem)
if settings.USE_DEBUG_STORAGE:
debug_storage = DebugStorage.instance()
debug_sn = debug_storage.db.snapshot()
storages = DBCollection(debug_storage.db, debug_sn, DBPrefix.ST_Storage, StorageItem)
storages.DebugStorage = True
dtx = InvocationTransaction()
dtx.Version = 1
dtx.outputs = []
dtx.inputs = []
dtx.scripts = []
def SearchContracts(self, query):
res = []
sn = self._db.snapshot()
contracts = DBCollection(self._db, sn, DBPrefix.ST_Contract, ContractState)
keys = contracts.Keys
query = query.casefold()
for item in keys:
contract = contracts.TryGet(keyval=item)
try:
if query in contract.Name.decode('utf-8').casefold():
res.append(contract)
elif query in contract.Author.decode('utf-8').casefold():
res.append(contract)
elif query in contract.Description.decode('utf-8').casefold():
res.append(contract)
elif query in contract.Email.decode('utf-8').casefold():
res.append(contract)
def ShowAllContracts(self):
contracts = DBCollection(self._db, DBPrefix.ST_Contract, ContractState)
keys = contracts.Keys
return keys
def GetSpentCoins(self, tx_hash):
if type(tx_hash) is not bytes:
tx_hash = bytes(tx_hash.encode('utf-8'))
sn = self._db.snapshot()
coins = DBCollection(self._db, sn, DBPrefix.ST_SpentCoin, SpentCoinState)
result = coins.TryGet(keyval=tx_hash)
sn.close()
return result
def Persist(self, block):
self._persisting_block = block
sn = self._db.snapshot()
accounts = DBCollection(self._db, sn, DBPrefix.ST_Account, AccountState)
unspentcoins = DBCollection(self._db, sn, DBPrefix.ST_Coin, UnspentCoinState)
spentcoins = DBCollection(self._db, sn, DBPrefix.ST_SpentCoin, SpentCoinState)
assets = DBCollection(self._db, sn, DBPrefix.ST_Asset, AssetState)
validators = DBCollection(self._db, sn, DBPrefix.ST_Validator, ValidatorState)
contracts = DBCollection(self._db, sn, DBPrefix.ST_Contract, ContractState)
storages = DBCollection(self._db, sn, DBPrefix.ST_Storage, StorageItem)
amount_sysfee = self.GetSysFeeAmount(block.PrevHash) + block.TotalFees().value
amount_sysfee_bytes = amount_sysfee.to_bytes(8, 'little')
to_dispatch = []
with self._db.write_batch() as wb:
wb.put(DBPrefix.DATA_Block + block.Hash.ToBytes(), amount_sysfee_bytes + block.Trim())
for tx in block.Transactions:
wb.put(DBPrefix.DATA_Transaction + tx.Hash.ToBytes(), block.IndexBytes() + tx.ToArray())
# go through all outputs and add unspent coins to them
def Persist(self, block):
self._persisting_block = block
sn = self._db.snapshot()
accounts = DBCollection(self._db, sn, DBPrefix.ST_Account, AccountState)
unspentcoins = DBCollection(self._db, sn, DBPrefix.ST_Coin, UnspentCoinState)
spentcoins = DBCollection(self._db, sn, DBPrefix.ST_SpentCoin, SpentCoinState)
assets = DBCollection(self._db, sn, DBPrefix.ST_Asset, AssetState)
validators = DBCollection(self._db, sn, DBPrefix.ST_Validator, ValidatorState)
contracts = DBCollection(self._db, sn, DBPrefix.ST_Contract, ContractState)
storages = DBCollection(self._db, sn, DBPrefix.ST_Storage, StorageItem)
amount_sysfee = self.GetSysFeeAmount(block.PrevHash) + block.TotalFees().value
amount_sysfee_bytes = amount_sysfee.to_bytes(8, 'little')
to_dispatch = []
with self._db.write_batch() as wb:
wb.put(DBPrefix.DATA_Block + block.Hash.ToBytes(), amount_sysfee_bytes + block.Trim())
for tx in block.Transactions:
def test_invoke(script, wallet, outputs, withdrawal_tx=None,
from_addr=None, min_fee=DEFAULT_MIN_FEE,
invoke_attrs=None, owners=None):
# print("invoke script %s " % script)
if from_addr is not None:
from_addr = PromptUtils.lookup_addr_str(wallet, from_addr)
bc = GetBlockchain()
accounts = DBCollection(bc._db, DBPrefix.ST_Account, AccountState)
assets = DBCollection(bc._db, DBPrefix.ST_Asset, AssetState)
validators = DBCollection(bc._db, DBPrefix.ST_Validator, ValidatorState)
contracts = DBCollection(bc._db, DBPrefix.ST_Contract, ContractState)
storages = DBCollection(bc._db, DBPrefix.ST_Storage, StorageItem)
# if we are using a withdrawal tx, don't recreate the invocation tx
# also, we don't want to reset the inputs / outputs
# since those were already calculated
if withdrawal_tx is not None:
tx = withdrawal_tx
else:
tx = InvocationTransaction()
tx.outputs = outputs
tx.inputs = []
tx.Version = 1
tx.scripts = []
def Persist(self, block):
self._persisting_block = block
sn = self._db.snapshot()
accounts = DBCollection(self._db, sn, DBPrefix.ST_Account, AccountState)
unspentcoins = DBCollection(self._db, sn, DBPrefix.ST_Coin, UnspentCoinState)
spentcoins = DBCollection(self._db, sn, DBPrefix.ST_SpentCoin, SpentCoinState)
assets = DBCollection(self._db, sn, DBPrefix.ST_Asset, AssetState)
validators = DBCollection(self._db, sn, DBPrefix.ST_Validator, ValidatorState)
contracts = DBCollection(self._db, sn, DBPrefix.ST_Contract, ContractState)
storages = DBCollection(self._db, sn, DBPrefix.ST_Storage, StorageItem)
amount_sysfee = self.GetSysFeeAmount(block.PrevHash) + block.TotalFees().value
amount_sysfee_bytes = amount_sysfee.to_bytes(8, 'little')
to_dispatch = []
with self._db.write_batch() as wb:
wb.put(DBPrefix.DATA_Block + block.Hash.ToBytes(), amount_sysfee_bytes + block.Trim())
for tx in block.Transactions:
wb.put(DBPrefix.DATA_Transaction + tx.Hash.ToBytes(), block.IndexBytes() + tx.ToArray())
# go through all outputs and add unspent coins to them
def GetUnclaimed(self, hash):
tx, height = self.GetTransaction(hash)
if tx is None:
return None
out = {}
coins = DBCollection(self._db, DBPrefix.ST_SpentCoin, SpentCoinState)
state = coins.TryGet(keyval=hash.ToBytes())
if state:
for item in state.Items:
out[item.index] = SpentCoin(tx.outputs[item.index], height, item.height)
return out