Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
nodelist = NodeList()
nodelist.update_nodes(steem_instance=Steem(node=nodelist.get_nodes(hive=True), num_retries=10))
cls.bts = Steem(
node=nodelist.get_nodes(hive=True),
nobroadcast=True,
num_retries=10,
timeout=30,
use_condenser=False,
keys={"active": wif},
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
set_shared_steem_instance(cls.bts)
cls.bts.set_default_account("test")
b = Blockchain(steem_instance=cls.bts)
num = b.get_current_block_num()
cls.start = num - 20
cls.stop = num
cls.max_batch_size = 1 # appbase does not support batch rpc calls at the momement (internal error)
def test_wait_for_and_get_block(self):
bts = self.bts
b = Blockchain(steem_instance=bts, max_block_wait_repetition=18)
start_num = b.get_current_block_num()
blocknum = start_num
last_fetched_block_num = None
for i in range(3):
block = b.wait_for_and_get_block(blocknum)
last_fetched_block_num = block.block_num
blocknum = last_fetched_block_num + 1
self.assertEqual(last_fetched_block_num, start_num + 2)
b2 = Blockchain(steem_instance=bts, max_block_wait_repetition=1)
with self.assertRaises(
BlockWaitTimeExceeded
):
for i in range(300):
block = b2.wait_for_and_get_block(blocknum)
last_fetched_block_num = block.block_num
def test_blockchain(self):
bts = self.bts
b = Blockchain(steem_instance=bts)
num = b.get_current_block_num()
self.assertTrue(num > 0)
self.assertTrue(isinstance(num, int))
block = b.get_current_block()
self.assertTrue(isinstance(block, Block))
self.assertTrue(num <= block.identifier)
block_time = b.block_time(block.identifier)
self.assertEqual(block.time(), block_time)
block_timestamp = b.block_timestamp(block.identifier)
timestamp = int(time.mktime(block.time().timetuple()))
self.assertEqual(block_timestamp, timestamp)
def test_estimate_block_num(self):
bts = self.bts
b = Blockchain(steem_instance=bts)
last_block = b.get_current_block()
num = last_block.identifier
old_block = Block(num - 60, steem_instance=bts)
date = old_block.time()
est_block_num = b.get_estimated_block_num(date, accurate=False)
self.assertTrue((est_block_num - (old_block.identifier)) < 10)
est_block_num = b.get_estimated_block_num(date, accurate=True)
self.assertTrue((est_block_num - (old_block.identifier)) < 2)
est_block_num = b.get_estimated_block_num(date, estimateForwards=True, accurate=True)
self.assertTrue((est_block_num - (old_block.identifier)) < 2)
est_block_num = b.get_estimated_block_num(date, estimateForwards=True, accurate=False)
def setUpClass(cls):
nodelist = NodeList()
nodelist.update_nodes(steem_instance=Steem(node=nodelist.get_nodes(exclude_limited=False), num_retries=10))
cls.bts = Steem(
node=nodelist.get_nodes(exclude_limited=True),
nobroadcast=True,
timeout=30,
num_retries=30,
)
# from getpass import getpass
# self.bts.wallet.unlock(getpass())
set_shared_steem_instance(cls.bts)
cls.bts.set_default_account("test")
b = Blockchain(steem_instance=cls.bts)
num = b.get_current_block_num()
# num = 23346630
cls.start = num - 25
cls.stop = num
# cls.N_transfer = 121
stm.wallet.addPrivateKey(posting_privkey)
else:
stm = Steem(node=testnet_node,
wif={'active': str(active_privkey),
'posting': str(posting_privkey),
'memo': str(memo_privkey)})
account = Account(username, steem_instance=stm)
account.disallow("beem1", permission='posting')
account.allow('beem1', weight=1, permission='posting', account=None)
if useWallet:
stm.wallet.getAccountFromPrivateKey(str(active_privkey))
# stm.create_account("beem1", creator=account, password=password1)
account1 = Account("beem1", steem_instance=stm)
b = Blockchain(steem_instance=stm)
blocknum = b.get_current_block().identifier
account.transfer("beem1", 1, "SBD", "test")
b1 = Block(blocknum, steem_instance=stm)
def test_signing_appbase(self):
b = Blockchain(steem_instance=self.bts)
st = None
for block in b.blocks(start=25304468, stop=25304468):
for trx in block.transactions:
st = Signed_Transaction(trx.copy())
self.assertTrue(st is not None)
def stream_votes(stm, threading, thread_num):
b = Blockchain(steem_instance=stm)
opcount = 0
start_time = time.time()
for op in b.stream(start=23483000, stop=23483200, threading=threading, thread_num=thread_num,
opNames=['vote']):
sys.stdout.write("\r%s" % op['block_num'])
opcount += 1
now = time.time()
total_duration = now - start_time
print(" votes: %d, time %.2f" % (opcount, total_duration))
return opcount, total_duration
acc = Account(name, steem_instance=stm)
acc_dict[name] = acc
if clear_acc_cache:
acc.clear_cache()
acc_dict = {}
if clear_all_cache:
clear_cache()
if not shared_instance:
del stm.rpc
if __name__ == "__main__":
stm = Steem()
print("Shared instance: " + str(stm))
set_shared_steem_instance(stm)
b = Blockchain()
account_list = []
for a in b.get_all_accounts(limit=500):
account_list.append(a)
shared_instance = False
clear_acc_cache = False
clear_all_cache = False
node = "https://api.steemit.com"
n = 3
for i in range(1, n + 1):
print("%d of %d" % (i, n))
profiling(node, account_list, shared_instance=shared_instance, clear_acc_cache=clear_acc_cache, clear_all_cache=clear_all_cache)
"""
def get_blocknum(index):
op = self._get_account_history(start=(index))
return op[0][1]['block']
max_index = self.virtual_op_count()
if max_index < stop_diff:
return 0
# calculate everything with block numbers
created = get_blocknum(0)
# convert blocktime to block number if given as a datetime/date/time
if isinstance(blocktime, (datetime, date, time)):
b = Blockchain(steem_instance=self.steem)
target_blocknum = b.get_estimated_block_num(addTzInfo(blocktime), accurate=True)
else:
target_blocknum = blocktime
# the requested blocknum/timestamp is before the account creation date
if target_blocknum <= created:
return 0
# get the block number from the account's latest operation
latest_blocknum = get_blocknum(-1)
# requested blocknum/timestamp is after the latest account operation
if target_blocknum >= latest_blocknum:
return max_index
# all account ops in a single block