Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_contract_create_block(self):
hexdata = binascii.unhexlify(self.contract_create_block)
block = Helper.AsSerializableWithType(hexdata, 'neo.Core.Block.Block')
self.assertEqual(block.Index, self.contract_block_index)
result = Blockchain.Default().Persist(block)
self.assertTrue(result)
snapshot = Blockchain.Default()._db.snapshot()
contracts = DBCollection(Blockchain.Default()._db, snapshot, DBPrefix.ST_Contract, ContractState)
contract_added = contracts.TryGet(self.contract_hash)
self.assertIsNotNone(contract_added)
self.assertEqual(contract_added.HasStorage, False)
self.assertEqual(contract_added.Name, b'test create')
self.assertEqual(contract_added.Email, b'flow@neo.org')
code = contract_added.Code
def do_quit():
print('Shutting down.')
Blockchain.Default().Dispose()
reactor.stop()
NodeLeader.Instance().Shutdown()
sys.exit(0)
async def custom_background_code():
""" Custom code run in the background."""
while True:
logger.info("Block %s / %s", str(Blockchain.Default().Height), str(Blockchain.Default().HeaderHeight))
await asyncio.sleep(15)
def height_check(self):
last_height = 0
neo = self.Wallet.GetBalance(self.Wallet.GetCoinAssets()[0])
gas = self.Wallet.GetBalance(self.Wallet.GetCoinAssets()[1])
while (Blockchain.Default().HeaderHeight - last_height) > 10:
print('Updating Height...')
last_height = Blockchain.Default().HeaderHeight
sleep(5)
print('Wallet Height: ', self.Wallet._current_height)
print('Blockchain Height: ', Blockchain.Default().Height)
print('Header Height: ', Blockchain.Default().HeaderHeight)
print('Gas Balance: ', gas)
if self.Wallet._current_height <= Blockchain.Default().Height-20 or gas.ToInt() == 0:
self.Wallet.Rebuild()
while self.Wallet._current_height < Blockchain.Default().Height-2:
print('Rebuilding Wallet..')
sleep(5)
def setUpClass(cls):
Blockchain.DeregisterBlockchain()
os.makedirs(cls.LEVELDB_TESTPATH, exist_ok=True)
cls._blockchain = Blockchain(getBlockchainDB(cls.LEVELDB_TESTPATH), skip_version_check=True)
Blockchain.RegisterBlockchain(cls._blockchain)
def RequestWithdrawFrom(wallet, asset_id, contract_hash, to_addr, amount, require_password=True):
asset_type = asset_id.lower()
if asset_type not in ['neo', 'gas']:
raise Exception('please specify neo or gas to withdraw')
readable_addr = to_addr
asset_id = get_asset_id(wallet, asset_type)
contract = Blockchain.Default().GetContract(contract_hash)
shash = contract.Code.ScriptHash()
contract_addr = Crypto.ToAddress(shash)
to_addr = parse_param(to_addr, wallet)
amount = get_asset_amount(amount, asset_id)
if shash not in wallet._watch_only:
print("Add withdrawal contract address to your watch only: import watch_addr %s " % contract_addr)
return False
if amount < Fixed8.Zero():
print("Cannot withdraw negative amount")
return False
unspents = wallet.FindUnspentCoinsByAssetAndTotal(
def Relay(self, inventory):
if inventory is MinerTransaction: return False
#lock known hashes
if inventory.Hash() in self._known_hashes: return False
#endlock
self.InventoryReceiving.on_change(self, inventory)
if type(inventory) is Block:
if Blockchain.Default() == None: return False
if Blockchain.Default().ContainsBlock(inventory.HashToByteString()):
self.__log.debug("cant add block %s because blockchain already contains it " % inventory.HashToByteString())
return False
self.__log.debug("Will Try to add block" % inventory.HashToByteString())
if not Blockchain.Default().AddBlock(inventory): return False
elif type(inventory) is Transaction or issubclass(type(inventory), Transaction):
if not self.AddTransaction(inventory): return False
else:
if not inventory.Verify(): return False
relayed = self.RelayDirectly(inventory)
self.InventoryReceived.on_change(inventory)
try:
loop.run_forever()
except KeyboardInterrupt:
with suppress(asyncio.CancelledError):
loop.run_until_complete(asyncio.gather(shutdown()))
loop.stop()
finally:
loop.close()
# Run things
# NodeLeader.Instance().Start()
# After the reactor is stopped, gracefully shutdown the database.
NotificationDB.close()
Blockchain.Default().Dispose()
# NodeLeader.Instance().Shutdown()
def main():
# Setup the blockchain
blockchain = LevelDBBlockchain(settings.LEVELDB_PATH)
Blockchain.RegisterBlockchain(blockchain)
dbloop = task.LoopingCall(Blockchain.Default().PersistBlocks)
dbloop.start(.1)
NodeLeader.Instance().Start()
# Disable smart contract events for external smart contracts
settings.set_log_smart_contract_events(False)
# Start a thread with custom code
d = threading.Thread(target=custom_background_code)
d.setDaemon(True) # daemonizing the thread will kill it when the main thread is quit
d.start()
# Run all the things (blocking call)
logger.info("Everything setup and running. Waiting for events...")
reactor.run()
logger.info("Shutting down.")
mt = MinerTransaction()
mt.Nonce = 2083236893
output = TransactionOutput(
Blockchain.SystemShare().Hash,
Blockchain.SystemShare().Amount,
Crypto.ToScriptHash(Contract.CreateMultiSigRedeemScript(int(len(Blockchain.StandbyValidators()) / 2) + 1,
Blockchain.StandbyValidators()))
)
it = IssueTransaction([], [output], [], [script])
return Block(prev_hash, timestamp, index, consensus_data,
next_consensus, script,
[mt, Blockchain.SystemShare(), Blockchain.SystemCoin(), it],
True)