Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_initialize(self):
leader = NodeLeader.Instance()
self.assertEqual(leader.Peers, [])
self.assertEqual(leader.KNOWN_ADDRS, [])
# Instantiate the blockchain and subscribe to notifications
blockchain = LevelDBBlockchain(settings.chain_leveldb_path)
Blockchain.RegisterBlockchain(blockchain)
# Try to set up a notification db
if NotificationDB.instance():
NotificationDB.instance().start()
# Start the prompt interface
fn_prompt_history = os.path.join(settings.DATA_DIR_PATH, '.prompt.py.history')
cli = PromptInterface(fn_prompt_history)
# Run things
# reactor.suggestThreadPoolSize(15)
reactor.callInThread(cli.run)
NodeLeader.Instance().Start()
# reactor.run() is blocking, until `quit()` is called which stops the reactor.
reactor.run()
# After the reactor is stopped, gracefully shutdown the database.
NotificationDB.close()
Blockchain.Default().Dispose()
NodeLeader.Instance().Shutdown()
args = parser.parse_args()
if os.path.isfile(args.output):
print("Error: Wallet file %s already exists" % args.output)
exit(1)
settings.setup_privnet()
print("Blockchain DB path:", settings.chain_leveldb_path)
if os.path.exists(settings.chain_leveldb_path):
print("Warning: Chain database already exists. If this is from a previous private network, you need to delete %s" % settings.chain_leveldb_path)
blockchain = LevelDBBlockchain(settings.chain_leveldb_path)
Blockchain.RegisterBlockchain(blockchain)
reactor.suggestThreadPoolSize(15)
NodeLeader.Instance().Start()
pc = PrivnetClaimall(args.output, args.password, args.time, args.save_privnet_wif)
reactor.callInThread(pc.run)
reactor.run()
def show_nodes(self):
if len(NodeLeader.Instance().Peers) > 0:
out = ""
for peer in NodeLeader.Instance().Peers:
out += "Peer %s - IO: %s\n" % (peer.Name(), peer.IOStats())
print_tokens([(Token.Number, out)], self.token_style)
else:
print("Not connected yet\n")
# Instantiate the blockchain and subscribe to notifications
blockchain = LevelDBBlockchain(settings.LEVELDB_PATH)
Blockchain.RegisterBlockchain(blockchain)
# Try to set up a notification db
if NotificationDB.instance():
NotificationDB.instance().start()
cli = TheConstructInterface(input_args=args, debug=args.debug)
# Run
reactor.suggestThreadPoolSize(15)
reactor.callInThread(cli.run)
NodeLeader.Instance().Start()
reactor.run()
return None
wallet.Sign(context)
if context.Completed:
print("Signature complete, relaying...")
tx = context.Verifiable
tx.scripts = context.GetScripts()
wallet.SaveTransaction(tx)
print("will send tx: %s " % json.dumps(tx.ToJson(), indent=4))
relayed = NodeLeader.Instance().Relay(tx)
if relayed:
print("Relayed Tx: %s " % tx.Hash.ToString())
foundtx = self.wait_for_tx(tx)
if foundtx:
print("Transaction found, all is good!")
else:
print("Transaction NOT found!")
return('success')
else:
print("Could not relay tx %s " % tx.Hash.ToString())
return('fail')
else:
print("Transaction signed, but the signature is still incomplete")
return(json.dumps(context.ToJson(), separators=(',', ':')))
return None
wallet.Sign(context)
if context.Completed:
print("Signature complete, relaying...")
tx = context.Verifiable
tx.scripts = context.GetScripts()
wallet.SaveTransaction(tx)
print("will send tx: %s " % json.dumps(tx.ToJson(), indent=4))
relayed = NodeLeader.Instance().Relay(tx)
if relayed:
print("Relayed Tx: %s " % tx.Hash.ToString())
foundtx = self.wait_for_tx(tx)
if foundtx:
print("Transaction found, all is good!")
else:
print("Transaction NOT found!")
return('success')
else:
print("Could not relay tx %s " % tx.Hash.ToString())
return('fail')
else:
print("Transaction signed, but the signature is still incomplete")
return(json.dumps(context.ToJson(), separators=(',', ':')))
def wait_for_peers(self):
while len(NodeLeader.Instance().Peers) == 0:
self.logger.debug('waiting for NodeLeader peers')
sleep(1)
settings.setup_privnet()
# Instantiate the blockchain and subscribe to notifications
blockchain = LevelDBBlockchain(settings.chain_leveldb_path)
Blockchain.RegisterBlockchain(blockchain)
dbloop = task.LoopingCall(Blockchain.Default().PersistBlocks)
dbloop.start(.1)
ndb = NotificationDB.instance()
ndb.start()
api_server = RestApi()
# Run
reactor.suggestThreadPoolSize(15)
NodeLeader.Instance().Start()
port = 8000
logger.info("Starting rest-api server on port %s" % (port))
api_server.app.run('0.0.0.0', port)