Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def twitter_stream(client, project_name, topic, track_list):
"""Connects to Twitter stream API."""
print 'Connecting to Twitter...'
with open('twitter.json') as f:
twitter_cred = json.load(f)
auth = tweepy.auth.OAuthHandler(twitter_cred['consumer_key'], twitter_cred['consumer_secret'])
auth.set_access_token(twitter_cred['access_token'], twitter_cred['access_token_secret'])
watcher = StreamWatcherListener(client=client, project=project_name, topic=topic)
stream = tweepy.Stream(auth, watcher, timeout=None)
track_list = [k for k in track_list.split(',')]
stream.filter(None, track_list)
out = ["ca","tx"]
else:
out = (set(args.out) & set(choices))
print "Q: %s, LOC: %s, LANG: %s, TYPE: %s, COUNT: %s, OUT: %s" %(qw ,ge ,l ,t ,c, out)
# Authentication (OAuth)
CKEY = "A2hLT3URk6OZ1TwGCNOmpw"
CSEC = "VCooTyH13GQ0BCYt2revG3ycv8IHG3Ki3UrHTe1KJ8"
ATOK = "71060978-si9zmRyZ4a9scK91nAuMUOZhzdYewMpepmZbPv02g"
ATSC = "Cl6v15svy2uUI55idBAkt9GRwABoQ9ZWCv7cNhq0Fs"
#f = open('./auth.k')
#f.close()
auth1 = tweepy.auth.OAuthHandler(CKEY, CSEC)
auth1.set_access_token(ATOK, ATSC)
api = tweepy.API(auth1)
# Searching
counter = 0
for tweet in tweepy.Cursor(api.search,
q = qw,
g = ge,
lang = l,
result_type = t,
count = c).items():
#print type(tweet)
for ox in out:
if ox == 'ca':
def tw_oauth(authfile):
with open(authfile, "r") as f:
ak = f.readlines()
f.close()
auth1 = tweepy.auth.OAuthHandler(ak[0].replace("\n",""), ak[1].replace("\n",""))
auth1.set_access_token(ak[2].replace("\n",""), ak[3].replace("\n",""))
return tweepy.API(auth1)
import time
import json
from config import *
import markovify
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn import svm
from sklearn.metrics import classification_report
import tweepy
from tweepy import Stream
from tweepy.streaming import StreamListener
auth = tweepy.auth.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
api = tweepy.API(auth)
#Load Hierarchical Attention Network
json_file = open('model.json', 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)
# load weights into new model
loaded_model.load_weights("model.h5")
print("Loaded model from disk")
def clean_str(string):
"""
Tokenization/string cleaning for dataset
def getAuthenticatedAPI(self):
try:
auth = tweepy.auth.OAuthHandler(self.options_string['hidden_application_key'], self.options_string['hidden_application_secret'], secure=True)
auth.set_access_token(self.options_string['hidden_access_token'], self.options_string['hidden_access_token_secret'])
return tweepy.API(auth)
except Exception,e:
logger.error(e)
return None
db["triples"].append(triple)
json.dump(triple, f)
f.write('\n')
print("Processed: {}".format(triple))
with open(os.path.join(this_files_path, private_twitter_creds_path), 'r') as f:
keys = json.load(f)
consumer_key = keys["consumer_key"]
consumer_secret = keys["consumer_secret"]
access_token = keys["access_token"]
access_token_secret = keys["access_token_secret"]
twitter_handle = keys["handle"] if keys["handle"][0] != '@' else keys["handle"][1:]
auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
if api.verify_credentials:
print("Successfully authenticated!")
else:
print("There was a mistake made when receiving credentials. Remove 'private_twitter_credentials.json' and try the script again")
db = {"triples": []}
# make a list because we can't re-use a tweepy.Cursor
your_friends = [user for user in tweepy.Cursor(api.friends, screen_name=twitter_handle).items()]
process_group(
twitter_handle,
"follows",
[user.screen_name for user in your_friends]
def tw_oauth():
authfile = 'auth.k'
with open(authfile, "r") as f:
ak = f.readlines()
f.close()
auth1 = tweepy.auth.OAuthHandler(ak[0].replace("\n",""), ak[1].replace("\n",""))
auth1.set_access_token(ak[2].replace("\n",""), ak[3].replace("\n",""))
return tweepy.API(auth1)
from .utils import *
import tweepy
auth = tweepy.auth.OAuthHandler(env("TWITTER_CONSUMER_KEY"),
env("TWITTER_CONSUMER_SECRET"))
auth.set_access_token(env("TWITTER_ACCESS_KEY"),
env("TWITTER_ACCESS_SECRET"))
locations = [
-7.93, 55.40, -0.53, 60.92,
-5.65, 51.60, 2.33, 56.13,
-5.52, 50.60, 1.67, 51.84,
-6.64, 49.82, 1.54, 51.29,
-8.28, 54.07, -5.27, 55.35
]
callback = None
class StreamWatcherListener(tweepy.StreamListener):
def __init__(self, callback, api=None):
import tweepy
import oauth2 as oauth
from linkedin import linkedin
auth = tweepy.auth.OAuthHandler('ihCnDKwZWZoXpeb7z3rQQ', 'TGcMdTGcxIQVC8dWeVrTNjjJhaU3GrsREE3U6jCqWc')
auth.set_access_token('1320599683-uTYgL1xAIWMUnIJlK9rKMFigukmoc6z5eGdnrDI', 'LW3oTRP69Qof08Act9iH13HzfN4ZOAzihxvvrUMWqI')
api = tweepy.API(auth)
# print api.me().name
# for status in tweepy.Cursor(api.user_timeline,id='microsoft').items(15):
# print status.text+'\n'
store = []
for tweet in tweepy.Cursor(api.search, q="Tesla Government", rpp=10000,
result_type="recent",
#include_entities=True,
lang="en",
with_twitter_user_id=True).items(10):
#print tweet.id, tweet.user.name, tweet.created_at, tweet.text
store.append(tweet.user.name)
print "\n"
print store
def oauth():
# Fill in your keys here:
consumer_key = 'XXXXXXXXXXXXXXXXXXXX'
consumer_secret = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
access_key = 'XXXXXXXX-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
access_secret = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
return auth