Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_follow_encoding(self):
s = Stream(None, None)
s._start = lambda is_async: None
s.filter(follow=[u'Caf\xe9'])
# Should be UTF-8 encoded
self.assertEqual(u'Caf\xe9'.encode('utf8'), s.body['follow'])
except User.DoesNotExist:
print "User %s does not exist" % user
sys.exit()
auth_dict = UserSocialAuth.objects.values().get(user__id=user.pk)['extra_data']
d = json.loads(auth_dict)
access_key = d['access_token']
d = dict([x.split("=") for x in access_key.split("&")])
ACCESS_KEY = d['oauth_token']
ACCESS_SECRET = d['oauth_token_secret']
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_KEY, ACCESS_SECRET)
TwitterListener.user = user
listener = TwitterListener(api=tweepy.API(auth_handler=auth))
stream = tweepy.streaming.Stream(auth, listener, secure=True)
while True:
try:
stream.userstream()
except KeyboardInterrupt:
break
except Exception, exc:
print sys.argv[1], exc
time.sleep(20)
def open_stream(socketio, track):
if track not in streams:
listener = CustomStreamListener(socketio, track)
auth = get_random_twitter_auth()
stream = tweepy.streaming.Stream(auth, listener)
if track == "everything":
stream.sample(async=True)
elif track.startswith('@'):
track = track[1:]
user_id = tweepy.API(auth).get_user(screen_name=track).id_str
stream.filter(track=[track], follow=[user_id], async=True)
else:
stream.filter(track=[track], async=True)
streams[track] = [stream, 0]
streams[track][1] += 1
return streams[track][0]
getFollowAccounts()
print("getting follow accounts....check")
getWhiteListAccounts()
print("getting whitelist accounts....check")
print("")
print("""\
_____ _ _ _ _____ _____ _____
|_ _|_ _ _|_| |_| |_ ___ ___| __ | |_ _|
| | | | | | | _| _| -_| _| __ -| | | | |
|_| |_____|_|_| |_| |___|_| |_____|_____| |_|
created by vishwenga
Running.... :) """)
try:
twt = Stream(auths, listener())
twt.filter(track=track_words)
except Exception as e:
print(str(e))
pass
def __init__(self, keywords, user, *args, **kwargs):
"""
Twitter Listener constructor. This class is used as middleware for the Twitter Listener
:param keywords: List of keywords
:param user: User to listen
"""
super(TwitterListener, self).__init__(*args, **kwargs)
self.user = None
self.keywords = None
if len(keywords[0]) > 0:
self.keywords = keywords
self.stream = tweepy.streaming.Stream(self.auth, TwitterStreamingListener(keywords))
if len(user) > 0:
try:
self.user = self.api.get_user(user)
self.user_stream = tweepy.streaming.Stream(self.auth, TwitterUserStreamingListener(user))
except Exception:
raise Exception("Error during the listener creation, does the user exists?")
def stream_mentions():
options = {'secure':True}
listener = CustomStreamListener()
s = tweepy.streaming.Stream(auth, listener, **options)
s.userstream()