Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rhiaro_accept_header(self):
"""Only send Accept header to rhiaro.co.uk right now.
https://github.com/snarfed/bridgy/issues/713
"""
self.assertEqual(util.REQUEST_HEADERS_CONNEG,
util.request_headers(url='http://rhiaro.co.uk/'))
self.assertEqual(util.REQUEST_HEADERS_CONNEG,
util.request_headers(source=Twitter(id='rhiaro')))
self.expect_requests_get('http://rhiaro.co.uk/', '',
headers=util.REQUEST_HEADERS_CONNEG)
self.mox.ReplayAll()
util.requests_get('http://rhiaro.co.uk/')
def _get_api(self):
api_key = self.bot.config["twitter-api-key"]
api_secret = self.bot.config["twitter-api-secret"]
access_token = self.bot.config["twitter-access-token"]
access_secret = self.bot.config["twitter-access-secret"]
return twitter.Twitter(auth=twitter.OAuth(
access_token, access_secret, api_key, api_secret))
self.BOT_CONFIG["FOLLOWS_FILE"],
self.BOT_CONFIG["FOLLOWERS_FILE"]]:
if not os.path.isfile(sync_file):
with open(sync_file, "w") as out_file:
out_file.write("")
# check how old the follower sync files are and recommend updating them
# if they are old
if (time.time() - os.path.getmtime(self.BOT_CONFIG["FOLLOWS_FILE"]) > 86400 or
time.time() - os.path.getmtime(self.BOT_CONFIG["FOLLOWERS_FILE"]) > 86400):
print("Warning: Your Twitter follower sync files are more than a day old. "
"It is highly recommended that you sync them by calling sync_follows() "
"before continuing.", file=sys.stderr)
# create an authorized connection to the Twitter API
self.TWITTER_CONNECTION = Twitter(auth=OAuth(self.BOT_CONFIG["OAUTH_TOKEN"],
self.BOT_CONFIG["OAUTH_SECRET"],
self.BOT_CONFIG["CONSUMER_KEY"],
self.BOT_CONFIG["CONSUMER_SECRET"]))
description="Swift Navigation Tweetxample.")
parser.add_argument("TOKEN")
parser.add_argument("TOKEN_KEY")
parser.add_argument("CON_SEC")
parser.add_argument("CON_SEC_KEY")
parser.add_argument(
"-p",
"--port",
default=['/dev/ttyUSB0'],
nargs=1,
help="specify the serial port to use.")
args = parser.parse_args()
my_auth = twitter.OAuth(args.TOKEN, args.TOKEN_KEY, args.CON_SEC,
args.CON_SEC_KEY)
twit = twitter.Twitter(auth=my_auth)
# Open a connection to Piksi using the default baud rate (1Mbaud)
with PySerialDriver(args.port[0], baud=1000000) as driver:
# Create a handler to connect our Piksi driver to our callbacks
with Handler(driver.read, driver.write, verbose=True) as handler:
try:
for msg, metadata in handler.filter(SBP_MSG_TWEET):
if twit is not None:
twit.statuses.update(msg)
except KeyboardInterrupt:
pass
def get_api(self, on_behalf_of=None):
# If user is None, tweet from the app's account
if on_behalf_of is None:
oauth = self.get_app_oauth()
# Otherwise, tweet from the user's twitter account
else:
oauth = self.get_user_oauth(on_behalf_of)
return Twitter(auth=oauth)
def set_auth(self, conf_file):
self.creepy = cree.Cree(conf_file)
self.t = twitter.Twitter(conf_file)
SAMPLE_SIZE)
# now, determine how many of the tweets have been retweed ~200 times,
# which is the number of results returned via an initial API request...
# Go to http://twitter.com/apps/new to create an app and get these items
consumer_key = ''
consumer_secret = ''
# authenticate with the twitter api
(oauth_token, oauth_token_secret) = oauth_dance('MiningTheSocialWeb',
consumer_key, consumer_secret)
t = twitter.Twitter(domain='api.twitter.com', api_version='1',
auth=twitter.oauth.OAuth(oauth_token, oauth_token_secret,
consumer_key, consumer_secret))
# protecting against Twitter API outages, etc. is omitted for clarity
stats = []
for (tweet_id, tweet_text) in tweet_ids_text:
# could fetch multiple pages worth of results here via the "page" parameter
# twitter module "workaround"
response = t.statuses.__getattr__(str(tweet_id)).retweeted_by(count=200)
retweeted_by = []
for r in response:
retweeted_by.append(r['screen_name'])
stats.append({
except SocialToken.DoesNotExist:
return None
kwargs = {
"status": post.status.encode("utf-8"),
}
if post.attached_media:
twt_up = Twitter(
domain='upload.twitter.com',
auth=OAuth(token, token_secret, app.client_id, app.secret)
)
media = []
contents = post.attached_media.read()
media.append(twt_up.media.upload(media=contents)["media_id_string"])
kwargs["media_ids"] = ",".join(media)
twt = Twitter(auth=OAuth(token, token_secret, app.client_id, app.secret))
twt.statuses.update(**kwargs)
post.is_posted = True
post.save()
def __init__(self, expected, auth, message, ratio=0.4, threshold=5):
self.expected = Bandwidth(download=expected.download * ratio,
upload=expected.upload * ratio)
self.message = message
self.twitter = twitter.Twitter(auth=auth)
self.warning_count = 0
self.threshold = threshold
self.last_tweet = 0
logger = logging.getLogger(__name__)
logger.info('Minimum expected speed: %s/%s' % (
ps(self.expected.download),
ps(self.expected.upload),))