Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_tweepy_status_populated(tweetsFilePath='tests/testTweetOut.json'):
# Load sample data in place
with open(tweetsFilePath, 'r') as ifl:
tweetSet = json.load(ifl)
tweets_from_disk = []
for x in tweetSet:
tweets_from_disk.append(tweepy.models.Status().parse(None, x))
return tweets_from_disk
from pandas import DataFrame
# from tweet import compute
from math import exp
@classmethod
def parse(cls, api, raw):
status = cls.first_parse(api, raw)
setattr(status, 'json', json.dumps(raw))
return status
metadata = open("output2.csv", 'a')
wr = csv.writer(metadata,dialect='excel')
# Status() is the data model for a tweet
tweepy.models.Status.first_parse = tweepy.models.Status.parse
# tweepy.models.Status.parse = parse
# User() is the data model for a user profil
tweepy.models.User.first_parse = tweepy.models.User.parse
# tweepy.models.User.parse = parse
# You need to do it for all the models you need
def compute(name, count_num, max_num):
tweepy.models.Status.parse = parse
tweepy.models.User.parse = parse
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
print("Authentication successful")
api = tweepy.API(auth)
tweets = api.user_timeline(screen_name= name,
count=200, include_rts=False,
exclude_replies=True)
def parse_status_and_dispatch(self, stream_data):
"""
Process an incoming status message.
"""
status = tweepy.models.Status.parse(self.api, json.loads(stream_data))
if self.tweet_matchp(status):
self.status_count += 1
if self.should_stop():
self.running = False
return False
if self.opts.fields:
try:
csvrow = []
for f in self.opts.fields:
try:
value = utils.resolve_with_default(status, f, None)
except AttributeError:
if self.opts.terminate_on_error:
self.logger.error(
"Field '%s' not found in tweet id=%s, terminating." % (f, status.id_str))
def init_tweepy():
# Status() is the data model for a tweet
tweepy.models.Status.first_parse = tweepy.models.Status.parse
tweepy.models.Status.parse = parse
# User() is the data model for a user profil
tweepy.models.User.first_parse = tweepy.models.User.parse
tweepy.models.User.parse = parse
def exists(tweet):
"""
Tweet exists
:param tweet:
:return:
"""
if type(tweet) is tweepy.models.Status:
return pyTweetBot.db.DBConnector().get_session().query(Tweeted).filter(
or_(Tweeted.tweet_tweet_text == tweet.text,
Tweeted.tweet_tweet_id == tweet.id)).count() > 0
if type(tweet) is tw.Tweet:
return pyTweetBot.db.DBConnector().get_session().query(Tweeted).filter(
or_(Tweeted.tweet_tweet_text == tweet.get_text(),
Tweeted.tweet_tweet_text == tweet.get_url(),
Tweeted.tweet_tweet_text == tweet.get_tweet())).count() > 0
elif type(tweet) is unicode:
return pyTweetBot.db.DBConnector().get_session().query(Tweeted).filter(Tweeted.tweet_tweet_text == tweet).count() > 0
elif type(tweet) is int:
return pyTweetBot.db.DBConnector().get_session().query(Tweeted).filter(Tweeted.tweet_tweet_id == tweet).count() > 0
# end if
def init_tweepy():
# Status() is the data model for a tweet
tweepy.models.Status.first_parse = tweepy.models.Status.parse
tweepy.models.Status.parse = parse
# User() is the data model for a user profil
tweepy.models.User.first_parse = tweepy.models.User.parse
tweepy.models.User.parse = parse
@property
def probability(self):
if not self.user_id:
raise RuntimeError("Cannot use Botometer without an account ID")
if self.status is BotometerStatus.PENDING:
self._get_result()
return self.probability
if self.status is BotometerStatus.UNAVAILABLE:
return 0.0 # let's assume it's not a bot
return self._probability
class User(models.User):
"""A wrapper for Tweepy native User model, including custom methods to
check idle accounts and bots"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.botometer_result = BotometerResult()
@classmethod
def parse(cls, api, data):
"""This is how Tweepy instantiate User models from the API result"""
return super(User, cls).parse(api, data)
def last_status_before(self, **kwargs):
"""Takes any kwarg compatible with Python's `timedelta` and says
whether the user's last tweet is older than the `timedelta` defined by
these kwargs"""
def outputHomepageURLs(api,fpath,tw,tag):
print os.getcwd()
f=openTimestampedFile(fpath,'homepageurls.txt',False)
for u in tw:
un=tw[u]
if type(un) is tweepy.models.User:
l=un.url
if l:
urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', l)
for l in urls:
f.write(l+','+tag+'\n')
f.close()
def exists(tweet):
"""
Tweet exists
:param tweet:
:return:
"""
if type(tweet) is tweepy.models.Status:
return pyTweetBot.db.DBConnector().get_session().query(Tweeted).filter(
or_(Tweeted.tweet_tweet_text == tweet.text,
Tweeted.tweet_tweet_id == tweet.id)).count() > 0
if type(tweet) is tw.Tweet:
return pyTweetBot.db.DBConnector().get_session().query(Tweeted).filter(
or_(Tweeted.tweet_tweet_text == tweet.get_text(),
Tweeted.tweet_tweet_text == tweet.get_url(),
Tweeted.tweet_tweet_text == tweet.get_tweet())).count() > 0
elif type(tweet) is unicode:
return pyTweetBot.db.DBConnector().get_session().query(Tweeted).filter(Tweeted.tweet_tweet_text == tweet).count() > 0
elif type(tweet) is int:
return pyTweetBot.db.DBConnector().get_session().query(Tweeted).filter(Tweeted.tweet_tweet_id == tweet).count() > 0
# end if
def parse_response_list(self, response_list, extra_fields=None):
instances = []
for response in response_list:
if not isinstance(response, tweepy.models.Model):
log.error("Resource %s is not dictionary" % response)
continue
instance = self.parse_response_object(response, extra_fields)
instances += [instance]
return instances