Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, twitter_user=None):
self.auth = TwitterAuthenticator().authenticate_twitter_app()
self.twitter_client = API(self.auth)
self.twitter_user = twitter_user
out = (set(args.out) & set(choices))
print "Q: %s, LOC: %s, LANG: %s, TYPE: %s, COUNT: %s, OUT: %s" %(qw ,ge ,l ,t ,c, out)
# Authentication (OAuth)
CKEY = "A2hLT3URk6OZ1TwGCNOmpw"
CSEC = "VCooTyH13GQ0BCYt2revG3ycv8IHG3Ki3UrHTe1KJ8"
ATOK = "71060978-si9zmRyZ4a9scK91nAuMUOZhzdYewMpepmZbPv02g"
ATSC = "Cl6v15svy2uUI55idBAkt9GRwABoQ9ZWCv7cNhq0Fs"
#f = open('./auth.k')
#f.close()
auth1 = tweepy.auth.OAuthHandler(CKEY, CSEC)
auth1.set_access_token(ATOK, ATSC)
api = tweepy.API(auth1)
# Searching
counter = 0
for tweet in tweepy.Cursor(api.search,
q = qw,
g = ge,
lang = l,
result_type = t,
count = c).items():
#print type(tweet)
for ox in out:
if ox == 'ca':
print tweet.created_at, "\t",
if ox == 'tx':
def import_from_twitter(tweet_id):
if not CONFIG:
configure()
auth = tweepy.OAuthHandler(CONFIG['consumer_key'], CONFIG['consumer_secret'])
auth.set_access_token(CONFIG['access_token'], CONFIG['access_token_secret'])
api = tweepy.API(auth)
status = api.get_status(tweet_id)
return create_module(status.text)
if len(rows) == 1:
c = rows.first()
elif not len(rows):
current.log.error("s3msg", "No Twitter channels configured for login")
return None
if not c.consumer_key:
current.log.error("s3msg", "Twitter channel has no consumer key")
return None
try:
oauth = tweepy.OAuthHandler(c.consumer_key,
c.consumer_secret)
oauth.set_access_token(c.access_token,
c.access_token_secret)
twitter_api = tweepy.API(oauth)
return (twitter_api, c.twitter_account)
except:
return None
def tweepy_api(self):
if hasattr(self, '_tweepy_api'):
return self._tweepy_api
auth = tweepy.OAuthHandler(
os.getenv('TWITTER_CONSUMER_KEY'),
os.getenv('TWITTER_CONSUMER_SECRET'),
)
auth.set_access_token(
os.getenv('TWITTER_ACCESS_KEY'),
os.getenv('TWITTER_ACCESS_SECRET')
)
self._tweepy_api = tweepy.API(auth)
return self._tweepy_api
if form_vars.pin and s3.twitter_request_key and s3.twitter_request_secret:
try:
import tweepy
except:
raise HTTP(501, body=T("Can't import tweepy"))
oauth = tweepy.OAuthHandler(settings.twitter_oauth_consumer_key,
settings.twitter_oauth_consumer_secret)
oauth.set_request_token(s3.twitter_request_key,
s3.twitter_request_secret)
try:
oauth.get_access_token(form_vars.pin)
form_vars.oauth_key = oauth.access_token.key
form_vars.oauth_secret = oauth.access_token.secret
twitter = tweepy.API(oauth)
form_vars.twitter_account = twitter.me().screen_name
form_vars.pin = "" # we won't need it anymore
return
except tweepy.TweepError:
session.error = T("Settings were reset because authenticating with Twitter failed")
# Either user asked to reset, or error - clear everything
for k in ["oauth_key", "oauth_secret", "twitter_account"]:
form_vars[k] = None
for k in ["twitter_request_key", "twitter_request_secret"]:
s3[k] = ""
def get_all_tweets(screen_name):
# authorize twitter, initialize tweepy
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
# initialize a list to hold all the tweepy Tweets
alltweets = []
# make initial request for most recent tweets (200 is the maximum allowed count)
new_tweets = api.user_timeline(screen_name=screen_name, count=200)
# save most recent tweets
alltweets.extend(new_tweets)
# save the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
# keep grabbing tweets until there are no tweets left to grab
while len(new_tweets) > 0:
#print
import tweepy
from config import esconn, twitter_config
from get_stream_output_results import getStreamResultStatusIDs
# unicode mgmt
import sys
reload(sys)
sys.setdefaultencoding('utf8')
# go get elasticsearch connection
es = esconn.esconn()
# auth & api handlers
auth = tweepy.OAuthHandler(twitter_config.CONSUMER_KEY, twitter_config.CONSUMER_SECRET)
auth.set_access_token(twitter_config.ACCESS_TOKEN, twitter_config.ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
def getAttributesbyStatusID(statuses):
search = api.statuses_lookup(statuses, include_entities='yes')
for item in search:
# print str(item.id) + ': ' + item.text
yield item
# load Twitter screen_name & build a search
status_id_list = getStreamResultStatusIDs(size=10)
output = {item for item in getAttributesbyStatusID(status_id_list)}
print output
Arguments:
- credsfile (str): the full path of the filename that contains a JSON
file with credentials for Twitter
Returns:
A tweepy.api.API object
"""
fn = os.path.expanduser(credsfile) # get the full path in case the ~ is used
c = json.load(open(fn))
# Get authentication token
auth = tweepy.OAuthHandler(consumer_key = c['consumer_key'],
consumer_secret = c['consumer_secret'])
auth.set_access_token(c['access_token'], c['access_token_secret'])
# create an API handler
return tweepy.API(auth)
def tw_oauth(authfile):
with open(authfile, "r") as f:
ak = f.readlines()
f.close()
auth1 = tweepy.auth.OAuthHandler(ak[0].replace("\n",""), ak[1].replace("\n",""))
auth1.set_access_token(ak[2].replace("\n",""), ak[3].replace("\n",""))
return tweepy.API(auth1)