Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_authorperm_resolve(self):
self.assertEqual(resolve_authorperm('https://d.tube/#!/v/pottlund/m5cqkd1a'),
('pottlund', 'm5cqkd1a'))
self.assertEqual(resolve_authorperm("https://steemit.com/witness-category/@gtg/24lfrm-gtg-witness-log"),
('gtg', '24lfrm-gtg-witness-log'))
self.assertEqual(resolve_authorperm("@gtg/24lfrm-gtg-witness-log"),
('gtg', '24lfrm-gtg-witness-log'))
self.assertEqual(resolve_authorperm("https://busy.org/@gtg/24lfrm-gtg-witness-log"),
('gtg', '24lfrm-gtg-witness-log'))
self.assertEqual(resolve_authorperm('https://dlive.io/livestream/atnazo/61dd94c1-8ff3-11e8-976f-0242ac110003'),
('atnazo', '61dd94c1-8ff3-11e8-976f-0242ac110003'))
def test_authorperm_resolve(self):
self.assertEqual(resolve_authorperm('https://d.tube/#!/v/pottlund/m5cqkd1a'),
('pottlund', 'm5cqkd1a'))
self.assertEqual(resolve_authorperm("https://steemit.com/witness-category/@gtg/24lfrm-gtg-witness-log"),
('gtg', '24lfrm-gtg-witness-log'))
self.assertEqual(resolve_authorperm("@gtg/24lfrm-gtg-witness-log"),
('gtg', '24lfrm-gtg-witness-log'))
self.assertEqual(resolve_authorperm("https://busy.org/@gtg/24lfrm-gtg-witness-log"),
('gtg', '24lfrm-gtg-witness-log'))
self.assertEqual(resolve_authorperm('https://dlive.io/livestream/atnazo/61dd94c1-8ff3-11e8-976f-0242ac110003'),
('atnazo', '61dd94c1-8ff3-11e8-976f-0242ac110003'))
def __init__(
self,
authorperm,
full=True,
lazy=False,
steem_instance=None
):
self.full = full
self.lazy = lazy
self.steem = steem_instance or shared_steem_instance()
if isinstance(authorperm, string_types) and authorperm != "":
[author, permlink] = resolve_authorperm(authorperm)
self["id"] = 0
self["author"] = author
self["permlink"] = permlink
self["authorperm"] = authorperm
elif isinstance(authorperm, dict) and "author" in authorperm and "permlink" in authorperm:
authorperm["authorperm"] = construct_authorperm(authorperm["author"], authorperm["permlink"])
authorperm = self._parse_json_data(authorperm)
super(Comment, self).__init__(
authorperm,
id_item="authorperm",
lazy=lazy,
full=full,
steem_instance=steem_instance
)
def refresh(self):
if self.identifier == "":
return
if not self.steem.is_connected():
return
[author, permlink] = resolve_authorperm(self.identifier)
self.steem.rpc.set_next_node_on_empty_reply(True)
if self.steem.rpc.get_use_appbase():
try:
if self.use_tags_api:
content = self.steem.rpc.get_discussion({'author': author, 'permlink': permlink}, api="tags")
else:
content =self.steem.rpc.list_comments({"start": [author, permlink], "limit": 1, "order": "by_permlink"}, api="database")
if content is not None and "comments" in content:
content =content["comments"]
if isinstance(content, list) and len(content) >0:
content =content[0]
except:
content = self.steem.rpc.get_content(author, permlink)
else:
content = self.steem.rpc.get_content(author, permlink)
if not content or not content['author'] or not content['permlink']:
:param float weight: Voting weight. Range: -100.0 - +100.0.
:param str identifier: Identifier for the post to vote. Takes the
form ``@author/permlink``.
:param str account: (optional) Account to use for voting. If
``account`` is not defined, the ``default_account`` will be used
or a ValueError will be raised
"""
if not account:
if "default_account" in self.config:
account = self.config["default_account"]
if not account:
raise ValueError("You need to provide an account")
account = Account(account, blockchain_instance=self)
[post_author, post_permlink] = resolve_authorperm(identifier)
vote_weight = int(float(weight) * STEEM_1_PERCENT)
if vote_weight > STEEM_100_PERCENT:
vote_weight = STEEM_100_PERCENT
if vote_weight < -STEEM_100_PERCENT:
vote_weight = -STEEM_100_PERCENT
op = operations.Vote(
**{
"voter": account["name"],
"author": post_author,
"permlink": post_permlink,
"weight": vote_weight
})
return self.finalizeOp(op, account, "posting", **kwargs)
async def lang0(ctx, link):
curator = re.sub(r'\d{4}|\W|(TravelFeed)','',str(ctx.message.author),re.IGNORECASE|re.DOTALL)
if not ctx.message.channel.id == commandchannel:
await send_discord("Bot commands are only allowed in #bot-commands", ctx.message.channel.id)
return
if not ctx.message.author.id in discordcuratorlist:
await send_discord("Curator unauthorised: "+curator, logchannel)
return
await bot.add_reaction(ctx.message, "⏳")
author, permlink = resolve_authorperm(link)
authorperm = construct_authorperm(author, permlink)
post = Comment(authorperm)
actionqueue.put(Post_Action(post, "lang0", None, ctx.message))
.. note:: A post/comment can only be deleted as long as it has no
replies and no positive rshares on it.
"""
if not account:
if "default_account" in self.steem.config:
account = self.steem.config["default_account"]
if not account:
raise ValueError("You need to provide an account")
account = Account(account, steem_instance=self.steem)
if not identifier:
post_author = self["author"]
post_permlink = self["permlink"]
else:
[post_author, post_permlink] = resolve_authorperm(identifier)
op = operations.Delete_comment(
**{"author": post_author,
"permlink": post_permlink})
return self.steem.finalizeOp(op, account, "posting")
async def on_reaction_add(reaction, user):
"""Initiate curation process by adding a reaction"""
if reaction.message.content.startswith('http'):
curator = re.sub(r'\d{4}|\W|(TravelFeed)','',str(user),re.IGNORECASE|re.DOTALL)
if not user.id in discordcuratorlist and not user.id == botid:
"""Checks if user who added reaction is a curator"""
await send_discord("Curator unauthorised: "+curator, logchannel)
return
else:
author, permlink = resolve_authorperm(reaction.message.content)
post = Comment(construct_authorperm(author, permlink))
if reaction.emoji == '🌍':
await bot.add_reaction(reaction.message, "⏳")
actionqueue.put(Post_Action(post, "tf100", curator, reaction.message))
elif reaction.emoji == '🌐':
await bot.add_reaction(reaction.message, "⏳")
actionqueue.put(Post_Action(post, "tf50", curator, reaction.message))
elif reaction.emoji == '👥':
await bot.add_reaction(reaction.message, "⏳")
actionqueue.put(Post_Action(post, "coop100", None, reaction.message))
elif reaction.emoji == '👋':
await bot.add_reaction(reaction.message, "⏳")
actionqueue.put(Post_Action(post, "ad10", curator, reaction.message))
elif reaction.emoji == '📏':
await bot.add_reaction(reaction.message, "⏳")
actionqueue.put(Post_Action(post, "short0", None, reaction.message))
or a ValueError will be raised
:param str identifier: Identifier for the post to vote. Takes the
form ``@author/permlink``.
"""
if not account:
if "default_account" in self.steem.config:
account = self.steem.config["default_account"]
if not account:
raise ValueError("You need to provide an account")
account = Account(account, steem_instance=self.steem)
if not identifier:
post_author = self["author"]
post_permlink = self["permlink"]
else:
[post_author, post_permlink] = resolve_authorperm(identifier)
vote_weight = int(float(weight) * STEEM_1_PERCENT)
if vote_weight > STEEM_100_PERCENT:
vote_weight = STEEM_100_PERCENT
if vote_weight < -STEEM_100_PERCENT:
vote_weight = -STEEM_100_PERCENT
op = operations.Vote(
**{
"voter": account["name"],
"author": post_author,
"permlink": post_permlink,
"weight": vote_weight
})
return self.steem.finalizeOp(op, account, "posting", **kwargs)
def get_reblogged_by(self, identifier=None):
"""Shows in which blogs this post appears"""
if not identifier:
post_author = self["author"]
post_permlink = self["permlink"]
else:
[post_author, post_permlink] = resolve_authorperm(identifier)
if not self.steem.is_connected():
return None
self.steem.rpc.set_next_node_on_empty_reply(False)
if self.steem.rpc.get_use_appbase():
return self.steem.rpc.get_reblogged_by({'author': post_author, 'permlink': post_permlink}, api="follow")['accounts']
else:
return self.steem.rpc.get_reblogged_by(post_author, post_permlink, api="follow")