Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_estimate_block_num(self):
bts = self.bts
b = Blockchain(steem_instance=bts)
last_block = b.get_current_block()
num = last_block.identifier
old_block = Block(num - 60, steem_instance=bts)
date = old_block.time()
est_block_num = b.get_estimated_block_num(date, accurate=False)
self.assertTrue((est_block_num - (old_block.identifier)) < 10)
est_block_num = b.get_estimated_block_num(date, accurate=True)
self.assertTrue((est_block_num - (old_block.identifier)) < 2)
est_block_num = b.get_estimated_block_num(date, estimateForwards=True, accurate=True)
self.assertTrue((est_block_num - (old_block.identifier)) < 2)
est_block_num = b.get_estimated_block_num(date, estimateForwards=True, accurate=False)
bts = self.bts
test_block_id = self.test_block_id
block = Block(test_block_id, only_ops=True, steem_instance=bts)
self.assertEqual(block.identifier, test_block_id)
self.assertTrue(isinstance(block.time(), datetime))
self.assertTrue(isinstance(block, dict))
self.assertTrue(len(block.operations))
self.assertTrue(isinstance(block.ops_statistics(), dict))
block2 = Block(test_block_id + 1, steem_instance=bts)
self.assertTrue(block2.time() > block.time())
with self.assertRaises(
exceptions.BlockDoesNotExistsException
):
Block(0, steem_instance=bts)
def test_block(self, node_param):
if node_param == "instance":
set_shared_steem_instance(self.bts)
o = Block(1)
self.assertIn(o.steem.rpc.url, self.urls)
with self.assertRaises(
RPCConnection
):
Block(1, steem_instance=Steem(node="https://abc.d", autoconnect=False, num_retries=1))
else:
set_shared_steem_instance(Steem(node="https://abc.d", autoconnect=False, num_retries=1))
stm = self.bts
o = Block(1, steem_instance=stm)
self.assertIn(o.steem.rpc.url, self.urls)
with self.assertRaises(
RPCConnection
):
Block(1)
def test_block(self, node_param):
if node_param == "instance":
set_shared_steem_instance(self.bts)
o = Block(1)
self.assertIn(o.steem.rpc.url, self.urls)
with self.assertRaises(
RPCConnection
):
Block(1, steem_instance=Steem(node="https://abc.d", autoconnect=False, num_retries=1))
else:
set_shared_steem_instance(Steem(node="https://abc.d", autoconnect=False, num_retries=1))
stm = self.bts
o = Block(1, steem_instance=stm)
self.assertIn(o.steem.rpc.url, self.urls)
with self.assertRaises(
RPCConnection
):
Block(1)
def test_verify_transaction(self):
stm = self.stm
block = Block(22005665, steem_instance=stm)
trx = block.transactions[28]
signed_tx = Signed_Transaction(trx)
key = signed_tx.verify(chain=stm.chain_params, recover_parameter=False)
public_key = format(Base58(key[0]), stm.prefix)
self.assertEqual(public_key, "STM4tzr1wjmuov9ftXR6QNv7qDWsbShMBPQpuwatZsfSc5pKjRDfq")
def test_blockchain(self):
bts = self.bts
b = Blockchain(steem_instance=bts)
num = b.get_current_block_num()
self.assertTrue(num > 0)
self.assertTrue(isinstance(num, int))
block = b.get_current_block()
self.assertTrue(isinstance(block, Block))
self.assertTrue(num <= block.identifier)
block_time = b.block_time(block.identifier)
self.assertEqual(block.time(), block_time)
block_timestamp = b.block_timestamp(block.identifier)
timestamp = int(time.mktime(block.time().timetuple()))
self.assertEqual(block_timestamp, timestamp)
repetition = 0
# can't return the block before the chain has reached it (support future block_num)
while last_current_block_num < block_number:
repetition += 1
time.sleep(self.block_interval)
if last_current_block_num - block_number < 50:
last_current_block_num = self.get_current_block_num()
if repetition > blocks_waiting_for * self.max_block_wait_repetition:
raise BlockWaitTimeExceeded("Already waited %d s" % (blocks_waiting_for * self.max_block_wait_repetition * self.block_interval))
# block has to be returned properly
repetition = 0
cnt = 0
block = None
while (block is None or block.block_num is None or int(block.block_num) != block_number) and (block_number_check_cnt < 0 or cnt < block_number_check_cnt):
try:
block = Block(block_number, only_ops=only_ops, only_virtual_ops=only_virtual_ops, steem_instance=self.steem)
cnt += 1
except BlockDoesNotExistsException:
block = None
if repetition > blocks_waiting_for * self.max_block_wait_repetition:
raise BlockWaitTimeExceeded("Already waited %d s" % (blocks_waiting_for * self.max_block_wait_repetition * self.block_interval))
repetition += 1
time.sleep(self.block_interval)
return block
def get_current_block(self, only_ops=False, only_virtual_ops=False):
""" This call returns the current block
:param bool only_ops: Returns block with operations only, when set to True (default: False)
:param bool only_virtual_ops: Includes only virtual operations (default: False)
.. note:: The block number returned depends on the ``mode`` used
when instantiating from this class.
"""
return Block(
self.get_current_block_num(),
only_ops=only_ops,
only_virtual_ops=only_virtual_ops,
steem_instance=self.steem
)