Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.assertEqual(h_list[-1][1]['block'], h_all_raw[-10 + zero_element][1]['block'])
h_list = []
for h in account.get_account_history(10, 10, use_block_num=False, order=-1, raw_output=True):
h_list.append(h)
self.assertEqual(h_list[0][0], 10)
# self.assertEqual(h_list[-1][0], zero_element)
self.assertEqual(h_list[0][1]['block'], h_all_raw[-11 + zero_element][1]['block'])
self.assertEqual(h_list[-1][1]['block'], h_all_raw[-1][1]['block'])
h_list = []
for h in account.get_account_history(10, 10, use_block_num=False, start=9, stop=1, order=-1, raw_output=True):
h_list.append(h)
self.assertEqual(h_list[0][0], 9)
self.assertEqual(h_list[-1][0], 1)
self.assertEqual(h_list[0][1]['block'], h_all_raw[-10 + zero_element][1]['block'])
self.assertEqual(h_list[-1][1]['block'], h_all_raw[-2 + zero_element][1]['block'])
start = formatTimeString(h_list[0][1]["timestamp"])
stop = formatTimeString(h_list[-1][1]["timestamp"])
h_list = []
for h in account.get_account_history(10, 10, start=start, stop=stop, order=-1, raw_output=True):
h_list.append(h)
self.assertEqual(h_list[0][0], 9)
self.assertEqual(h_list[-1][0], 1)
self.assertEqual(h_list[0][1]['block'], h_all_raw[-10 + zero_element][1]['block'])
self.assertEqual(h_list[-1][1]['block'], h_all_raw[-2 + zero_element][1]['block'])
def test_formatDateTimetoTimeStamp(self):
t = "1970-01-01T00:00:00"
t = formatTimeString(t)
timestamp = formatToTimeStamp(t)
self.assertEqual(timestamp, 0)
t2 = "2018-07-10T10:08:39"
timestamp = formatToTimeStamp(t2)
self.assertEqual(timestamp, 1531217319)
t3 = datetime(2018, 7, 10, 10, 8, 39)
timestamp = formatToTimeStamp(t3)
self.assertEqual(timestamp, 1531217319)
def _parse_json_data(self, witness):
parse_times = [
"created", "last_sbd_exchange_update", "hardfork_time_vote",
]
for p in parse_times:
if p in witness and isinstance(witness.get(p), string_types):
witness[p] = formatTimeString(witness.get(p, "1970-01-01T00:00:00"))
parse_int = [
"votes", "virtual_last_update", "virtual_position", "virtual_scheduled_time",
]
for p in parse_int:
if p in witness and isinstance(witness.get(p), string_types):
witness[p] = int(witness.get(p, "0"))
return witness
output.pop("authorperm")
if 'json_metadata' in output:
output["json_metadata"] = json.dumps(output["json_metadata"], separators=[',', ':'])
if "tags" in output:
output.pop("tags")
if "community" in output:
output.pop("community")
parse_times = [
"active", "cashout_time", "created", "last_payout", "last_update",
"max_cashout_time"
]
for p in parse_times:
if p in output:
p_date = output.get(p, datetime(1970, 1, 1, 0, 0))
if isinstance(p_date, (datetime, date)):
output[p] = formatTimeString(p_date)
else:
output[p] = p_date
sbd_amounts = [
"total_payout_value",
"max_accepted_payout",
"pending_payout_value",
"curator_payout_value",
"total_pending_payout_value",
"promoted",
]
for p in sbd_amounts:
if p in output and isinstance(output[p], Amount):
output[p] = output[p].json()
parse_int = [
"author_reputation",
]
def get_curation_penalty(self, vote_time=None):
""" If post is less than 15 minutes old, it will incur a curation
reward penalty.
:param datetime vote_time: A vote time can be given and the curation
penalty is calculated regarding the given time (default is None)
When set to None, the current date is used.
:returns: Float number between 0 and 1 (0.0 -> no penalty, 1.0 -> 100 % curation penalty)
:rtype: float
"""
if vote_time is None:
elapsed_seconds = self.time_elapsed().total_seconds()
elif isinstance(vote_time, str):
elapsed_seconds = (formatTimeString(vote_time) - self["created"]).total_seconds()
elif isinstance(vote_time, (datetime, date)):
elapsed_seconds = (vote_time - self["created"]).total_seconds()
else:
raise ValueError("vote_time must be a string or a datetime")
if self.steem.hardfork >= 21:
reward = (elapsed_seconds / STEEM_REVERSE_AUCTION_WINDOW_SECONDS_HF21)
elif self.steem.hardfork >= 20:
reward = (elapsed_seconds / STEEM_REVERSE_AUCTION_WINDOW_SECONDS_HF20)
else:
reward = (elapsed_seconds / STEEM_REVERSE_AUCTION_WINDOW_SECONDS_HF6)
if reward > 1:
reward = 1.0
return 1.0 - reward
def _parse_json_data(self, vote):
parse_int = [
"rshares", "reputation",
]
for p in parse_int:
if p in vote and isinstance(vote.get(p), string_types):
vote[p] = int(vote.get(p, "0"))
if "time" in vote and isinstance(vote.get("time"), string_types) and vote.get("time") != '':
vote["time"] = formatTimeString(vote.get("time", "1970-01-01T00:00:00"))
elif "timestamp" in vote and isinstance(vote.get("timestamp"), string_types) and vote.get("timestamp") != '':
vote["time"] = formatTimeString(vote.get("timestamp", "1970-01-01T00:00:00"))
else:
vote["time"] = formatTimeString("1970-01-01T00:00:00")
return vote
comment["tags"] = comment['json_metadata']["tags"]
if 'community' in comment['json_metadata']:
comment['community'] = comment['json_metadata']['community']
parse_int = [
"author_reputation",
]
for p in parse_int:
if p in comment and isinstance(comment.get(p), string_types):
comment[p] = int(comment.get(p, "0"))
if "active_votes" in comment:
new_active_votes = []
for vote in comment["active_votes"]:
if 'time' in vote and isinstance(vote.get('time'), string_types):
vote['time'] = formatTimeString(vote.get('time', "1970-01-01T00:00:00"))
parse_int = [
"rshares", "reputation",
]
for p in parse_int:
if p in vote and isinstance(vote.get(p), string_types):
try:
vote[p] = int(vote.get(p, "0"))
except:
vote[p] = int(0)
new_active_votes.append(vote)
comment["active_votes"] = new_active_votes
return comment
return
while True:
# RPC call
for item in self.get_account_history(first, _limit, start=None, stop=None, order=1, raw_output=raw_output):
if raw_output:
item_index, event = item
op_type, op = event['op']
timestamp = event["timestamp"]
block_num = event["block"]
else:
item_index = item['index']
op_type = item['type']
timestamp = item["timestamp"]
block_num = item["block"]
if start is not None and isinstance(start, (datetime, date, time)):
timediff = start - formatTimeString(timestamp)
if timediff.total_seconds() > 0:
continue
elif start is not None and use_block_num and block_num < start:
continue
elif start is not None and not use_block_num and item_index < start:
continue
if stop is not None and isinstance(stop, (datetime, date, time)):
timediff = stop - formatTimeString(timestamp)
if timediff.total_seconds() < 0:
first = max_index + _limit
return
elif stop is not None and use_block_num and block_num > stop:
return
elif stop is not None and not use_block_num and item_index > stop:
return
if exclude_ops and op_type in exclude_ops: