Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_all_comments(self):
c_len = len(self.submission.comments)
flat = helpers.flatten_tree(self.submission.comments)
continue_items = [x for x in flat if isinstance(x, MoreComments) and
x.count == 0]
self.assertTrue(continue_items)
cf_len = len(flat)
saved = self.submission.replace_more_comments(threshold=2)
ac_len = len(self.submission.comments)
flat = helpers.flatten_tree(self.submission.comments)
acf_len = len(flat)
for item in continue_items:
self.assertTrue(item.id in [x.id for x in flat])
self.assertEqual(len(self.submission._comments_by_id), acf_len)
self.assertTrue(c_len < ac_len)
self.assertTrue(c_len < cf_len)
self.assertTrue(ac_len < acf_len)
self.assertTrue(cf_len < acf_len)
try:
r = praw.Reddit("aamnews by /u/andreim at github.com/andreimarcu/aamnews")
sub = r.get_submission(url)
# Check if feed already exists for another channel
c.execute("SELECT feed_id FROM feed_reddit_comments WHERE article_id=? ", (sub.id,))
result = c.fetchone()
if result == None:
c.execute("INSERT INTO feeds (type_name) VALUES (?)", ("reddit_comments",))
feed_id = c.lastrowid
c.execute("INSERT INTO feed_reddit_comments (feed_id, article_id) VALUES (?,?)", (feed_id, sub.id))
items = [e for e in praw.helpers.flatten_tree(sub.comments)]
for item in items:
c.execute("INSERT INTO items (feed_id, unique_id) VALUES (?,?)", (feed_id, item.id))
else:
feed_id = result[0]
c.execute("INSERT INTO channel_feeds (feed_id, channel_id, name) VALUES (?,?,?)", (feed_id, channel_id, name))
conn.commit()
conn.close()
return p.say("Added " + name)
except Exception as exc:
def start():
reddit = praw.Reddit(user_agent = USER_AGENT)
reddit.login(REDDIT_USERNAME, REDDIT_PASS)
comment_stream = praw.helpers.comment_stream(reddit, "all", verbosity=1)
n = 0
for comment in comment_stream:
n += 1
if not n % 1000:
print n
for website in websites:
if comment.body.lower().find(website[0]) != -1:
got_one(comment);
def collect_points(url):
r = praw.Reddit(USERAGENT)
thread = r.get_submission(url=url)
if thread is None:
print("Thread not found")
return
comments = praw.helpers.flatten_tree(thread.comments)
result = []
for comment in comments:
isInList = False
wasEmpty = True
for line in comment.body.splitlines():
if wasEmpty:
isInList = LIST_RE.match(line)
wasEmpty = line == ""
if not wasEmpty and isInList:
result.append(LIST_LINESTART_RE.sub(r"\1* ", line))
with open(LIST_OUTPUTFILE, "w") as f:
f.write("\n".join(result))
elif isinstance(comment, praw.objects.MoreComments) and get_all_comments:
new_vocab, num_new_comments = get_vocabulary(comment.comments)
vocab += new_vocab
num_comments += num_new_comments
return vocab, num_comments
subreddit = self.reddit.get_subreddit(subreddit_name)
print "Comments processed for subreddit '{}': 0".format(subreddit_name),
# Initialise loop variables
vocabulary = Counter()
comments_processed = 0
for submission in subreddit.get_hot(limit=None):
comments = praw.helpers.flatten_tree(submission.comments)
# Run over all comments
submission_vocab, num_new_comments = get_vocabulary(comments)
vocabulary += submission_vocab
comments_processed += num_new_comments
print "{}...".format(comments_processed),
if limit and comments_processed >= limit:
break
print "{}. Finished!".format(comments_processed)
return vocabulary
import time
import praw
import re
import requests
from datetime import datetime
RETRY_MINUTES = 3
#regex to match urls that aren't allowed domains
urlregex = r"(\.jpg|\.jpeg|\.png|\.gif|\.gifv|\.apng|\.tiff|\.bmp|\.xcf)$"
#regex for all the allowed domains
domainregex = r"^(500px\.com|abload\.de|cdn\.theatlantic\.com|.*\.deviantart\.com|.*\.deviantart\.net|fav\.me|.*\.fbcdn\.net|.*\.files\.wordpress\.com|flic\.kr|flickr\.com|forgifs\.com|gfycat\.com|(.*\.)?gifsoup\.com|(.*\.)?gyazo\.com|(.*\.)?imageshack\.us|imgclean\.com|(i\.)?imgur\.com|instagr\.am|instagram\.com|(cdn\.)?mediacru\.sh|(.*\.)?media\.tumblr\.com|(.*\.)?min\.us|(.*\.)?minus\.com|(.*\.)?panoramio\.com|photoburst\.net|(.*\.)?photoshelter\.com|pbs\.twimg\.com|(.*\.)?photobucket\.com|picsarus\.com|puu\.sh|scontent\.cdninstagram\.com|(.*\.)?staticflickr\.com|(.*\.)?tinypic\.com|twitpic\.com|upload.wikimedia\.org|i\.reddituploads.com)"
#fires up praw and reddit, and identifies the, bot
r = praw.Reddit('ImagesOf v5.1 /u/amici_ursi', api_request_delay=1)
#identifies the stream of submissions that we're going swim in.
submission_stream = praw.helpers.submission_stream(r, 'all')
globalsubredditblacklist = set()
globaluserblacklist = set()
californiasubredditblacklist = set()
chinasubredditblacklist = set()
indiasubredditblacklist = set()
log = []
#searches the stream for all the criteria
def search_for_places(r):
print("searching for posts...")
for post in submission_stream:
#afghanistan
swim(r,
submission = post,
goodregex = r"(\bafghanistan\b|\bkabul\b|\bbamyan\b|\bkandahar\b|\bbamyan province\b|\bherat\b|\bband-e amir national park\b|\bjalalabad\b|\bmazar-i-sharif\b|\bpaghman\b|\bkunduz\b|\bghazni\b|\bbalkh\b|\bbaghlan province\b|\bsistan\b|\bbagram\b|\bbadghis province\b|\bkhost\b|\blashkar gah\b|\bfayzabad\b|\bmaymana\b|\bpuli khumri\b|\bsheberghan\b|\bfarah\b|\btalogan\b|\bsamangan\b|\bcharikar\b|\bmes aynak\b|\bsange-e-masha\b)",
postinto = "imagesofafghanistan",
continue
# Policy : bot doesn't reply to comments that already have replies
if len(comment.replies) > 0:
continue
cursor.execute("SELECT time FROM removed_comments WHERE parent_id = %s", [comment.name])
if cursor.fetchone() is not None:
continue
if log_activity:
cursor.execute("SELECT time FROM replied WHERE id = %s", [comment.name])
if cursor.fetchone() is not None:
continue
flat_comments = praw.helpers.flatten_tree(comment.submission.comments)
# Policy : bot only comments TWICE in any thread. Increased July 2015
bot_comments = sum(1 if hasattr(reply, 'author') and
reply.author is not None and
reply.author.name == user.name else 0
for reply in flat_comments)
if bot_comments >= 2:
continue
# Policy : bot doesn't reply in a thread if *anyone* on the user blacklist
# has also commented. Removed July 2015
if False:
blacklisted_replies = sum(1 if hasattr(reply, 'author') and
reply.author is not None and
reply.author.name in user_blacklist else 0
for reply in flat_comments)
def flatten(self, submission):
submission = self.reddit.get_submission(submission_id=submission, comment_limit=None)
flattened = praw.helpers.flatten_tree(submission.comments)
text_mass = ""
for comment in flattened:
if isinstance(comment, praw.objects.Comment):
# i hate these 3 lines of code but i'm too lazy to redo them
body = re.sub(r"https?://(?:www\.)?[A-z0-9-]+\.[A-z\.]+[\?A-z0-9&=/]*", "", comment.body, flags=re.IGNORECASE)
body = re.sub(r"<.*?>|&.*?;|/.+?(?= )|/.*", "", body)
text_mass = text_mass + body + "\n"
return text_mass
def _getCommentURLs(self, submission):
"""
Get image URLs linked from comments in the submission.
:type submission: praw.objects.Submission
:rtype: dict
"""
urls = {}
if self._commentCache.get(submission.id) is None:
try:
allComments = praw.helpers.flatten_tree(submission.comments)
except:
return urls
self._cacheComments(allComments, submission.id)
else:
allComments = self._commentCache.get(submission.id)
for comment in allComments:
if isinstance(comment, praw.objects.Comment): # Make sure it isn't a MoreComments object
author = comment.author
if author is None:
author = "[Deleted]"
else:
author = author.name
if self.getAuthorsCommentsOnly and (author != submission.author.name and author != '[Deleted]'):
continue
matches = self._urlFinder.findall(comment.body)
authorURLs = urls.get(author)
def _get_mentions_in_submission(counter: int, submission_: praw.objects.Submission) -> Optional[PostWithMentions]:
"""Finds mentions of a course in a submission's title, selftext, and comments.
:param counter: counter to print in table row
:type counter: int
:param submission_: a praw submission object
:type submission_: praw.objects.Submission
:return: a PostWithMentions object which has the post ID and a list of strings of mentions
:rtype: PostWithMentions, None
"""
mentions_list = []
mentions_list.extend(_get_mentions_in_string(submission_.title))
mentions_list.extend(_get_mentions_in_string(submission_.selftext))
submission_.replace_more_comments(limit = None, threshold = 0)
flat_comments = praw.helpers.flatten_tree(submission_.comments)
# TODO replace look-before-you-leap with try/except
for comment in flat_comments:
if comment.author is None or comment.author.name == 'ucsc-class-info-bot':
continue
mentions_list.extend(_get_mentions_in_string(comment.body))
mentions_list = _remove_list_duplicates_preserve_order(mentions_list)
author = submission_.author
if author is None:
author_name = "[deleted]"
else:
author_name = author.name
print(" ".join([trunc_pad(str(counter), 'num'),