Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_delete_start_redirect_url_error(self):
self.mox.StubOutWithMock(testutil.OAuthStartHandler, 'redirect_url')
testutil.OAuthStartHandler.redirect_url(state=mox.IgnoreArg()
).AndRaise(tweepy.TweepError('Connection closed unexpectedly...'))
self.mox.ReplayAll()
resp = app.application.get_response(
'/delete/start', method='POST', body=native_str(urllib.parse.urlencode({
'feature': 'listen',
'key': self.sources[0].key.urlsafe(),
})))
self.assertEquals(302, resp.status_int)
location = urllib.parse.urlparse(resp.headers['Location'])
self.assertEquals('/fake/0123456789', location.path)
self.assertEquals('!FakeSource API error 504: Connection closed unexpectedly...',
urllib.parse.unquote(location.fragment))
# initialize config and Twitter API
config = read_config()
api = get_app_auth_api(config)
# process user id or screen name, storing returned ids in plain text
args = {"count": FOLLOWERS_IDS_COUNT}
if user_id is not None:
args.update({"user_id": user_id})
if screen_name is not None:
args.update({"screen_name": screen_name})
limit = config.getint("followers", "limit")
try:
num_ids = write_ids(writer, api.followers_ids, args, cursored=True, limit=limit)
LOGGER.info("downloaded %d follower id(s)", num_ids)
except TweepError as err:
log_tweep_error(LOGGER, err)
# finished
LOGGER.info("get_followers() finished")
if not db:
db = DB(storage_dir)
self.db = db
ck = self.db.get_config('consumer_key')
cs = self.db.get_config('consumer_secret')
at = self.db.get_config('access_token')
ats = self.db.get_config('access_token_secret')
mf = wtModelFactory()
pr = ModelParser(model_factory=mf)
self.auth = OAuthHandler(ck, cs)
self.auth.set_access_token(at, ats)
self.api = API(self.auth, parser=pr)
try:
self.api.me().name
except TweepError as error:
raise TwitterError("Could not connect to Twitter: %s" % error)
except TypeError as error:
raise TwitterError("Your keys haven't been set correctly!")
def unfollow_cb(data, buffer, args):
"""Unfollows @user."""
global twitter
global buffers
try:
user = twitter.get_user(args)
user.unfollow()
except (TwitterError, tweepy.TweepError) as error:
print_error(error)
return wc.WEECHAT_RC_OK
remove_from_nicklist(buffers['__TIMELINE'], args)
print_success("User @%s unfollowed." % args)
return wc.WEECHAT_RC_OK
'that keys are placed in same order '
'as mentioned in README')
else:
consumer_key = keys[0]
consumer_secret = keys[1]
access_token = keys[2]
access_token_secret = keys[3]
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
APIHandler._api = tweepy.API(auth)
try:
APIHandler._api.verify_credentials()
except RateLimitError:
raise MaximumLimitCrossedException('Rate limit for API is reached. '
'Please wait for 15 minutes.')
except TweepError:
raise InvalidAPIException('Invalid or expired token')
return APIHandler._api
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
print('Please visit: ' + auth.get_authorization_url())
oauth_token = auth.request_token['oauth_token']
oauth_token_secret = auth.request_token['oauth_token_secret']
auth.request_token['oauth_token'] = oauth_token
auth.request_token['oauth_token_secret'] = oauth_token_secret
verifier = input('Verifier code: ')
try:
a = auth.get_access_token(verifier)
db.find_and_modify('access_token', a[0])
db.find_and_modify('access_token_secret', a[1])
except tweepy.TweepError as e:
print(e)
@json.json_view
def unfollow_twitter_account(request):
username = request.POST['username']
code = 1
message = "OK"
logging.user(request, "~BB~FRUnfollowing Twitter: %s" % username)
if username not in ['samuelclay', 'newsblur']:
return HttpResponseForbidden()
social_services = MSocialServices.objects.get(user_id=request.user.pk)
try:
api = social_services.twitter_api()
api.destroy_friendship(username)
except tweepy.TweepError, e:
code = -1
message = e
return {'code': code, 'message': message}
#no previous authentication data, need to autenthicate via browser
auth = tweepy.OAuthHandler("Bo0Usm7MbQLJGTUv3jAuwY1qz", "hx0gsX49KIDsm1MgWFuuYOV1zrR06Tcz5r06RmMxn4SsR41dFr")
try:
redirect_url = auth.get_authorization_url()
print("Redirect url:", redirect_url)
#copy redirect url in clipboard
pyperclip.copy(redirect_url)
except tweepy.TweepError:
print ('Error! Failed to get request token.')
verifier = input('Verifier:')
try:
auth.get_access_token(verifier)
except tweepy.TweepError:
print ('Error! Failed to get access token.')
access_token = auth.access_token
access_token_secret = auth.access_token_secret
twitterAuthData = open("twitterAccess.txt", "w")
twitterAuthData.write(auth.access_token+"\n"+auth.access_token_secret);
twitterAuthData.close();
else:
#already got auth data, read it from file
twitterAuthData = open("twitterAccess.txt", "r")
access_token = twitterAuthData.readline()[:-1] #exclude line feed from token string
access_token_secret = twitterAuthData.readline()
twitterAuthData.close()
auth = tweepy.OAuthHandler("Bo0Usm7MbQLJGTUv3jAuwY1qz", "hx0gsX49KIDsm1MgWFuuYOV1zrR06Tcz5r06RmMxn4SsR41dFr")
write_fd = open(output, 'w+')
for userid in id_list:
num_inputs_queried = num_inputs_queried + 1
if not userid == '':
try:
count = 0
for item in Cursor(api_pool.friends_ids, id=userid).items():
logger.debug('user id: {}'.format(item))
count = count + 1
tweet_item = {'id': item}
tweet_item['smapp_original_user_id'] = userid
tweet_item['smapp_timestamp'] = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S +0000')
write_fd.write(json.dumps(tweet_item))
write_fd.write('\n')
except TweepError as e:
logger.info('tweepy error: %s', e)
logger.info('counted %s objects for input %s', count, userid)
logger.info('number of inputs queried so far: %s', num_inputs_queried)
write_fd.close()
except tweepy.TweepError as e:
print(e.reason)
except StopIteration:
break
if retweet == "yes":
for tweet in tweepy.Cursor(api.search, search).items(numberOfTweets):
try:
#Retweet
tweet.retweet()
print('Retweeted the tweet')
except tweepy.TweepError as e:
print(e.reason)
except StopIteration:
break
if favorite == "yes":
for tweet in tweepy.Cursor(api.search, search).items(numberOfTweets):
try:
#Favorite
tweet.favorite()
print('Favorited the tweet')
except tweepy.TweepError as e:
print(e.reason)
except StopIteration: