Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_extended_compat():
t_compat = twarc.Twarc(tweet_mode="compat")
assert 'full_text' in next(T.search('obama'))
assert 'text' in next(t_compat.search("obama"))
assert 'full_text' in next(T.timeline(screen_name="BarackObama"))
assert 'text' in next(t_compat.timeline(screen_name="BarackObama"))
import twarc
"""
You will need to have these environment variables set to run these tests:
* CONSUMER_KEY
* CONSUMER_SECRET
* ACCESS_TOKEN
* ACCESS_TOKEN_SECRET
"""
logging.basicConfig(filename="test.log", level=logging.INFO)
T = twarc.Twarc()
def test_version():
import setup
assert setup.__version__ == twarc.__version__
def test_search():
count = 0
for tweet in T.search('obama'):
assert tweet['id_str']
count += 1
if count == 10:
break
assert count == 10
def test_connection_error_get(oauth1session_class):
mock_oauth1session = MagicMock(spec=OAuth1Session)
oauth1session_class.return_value = mock_oauth1session
mock_oauth1session.get.side_effect = requests.exceptions.ConnectionError
t = twarc.Twarc("consumer_key", "consumer_secret", "access_token",
"access_token_secret", connection_errors=3,
validate_keys=False)
with pytest.raises(requests.exceptions.ConnectionError):
t.get("https://api.twitter.com")
assert 3 == mock_oauth1session.get.call_count
def test_connection_error_post(oauth1session_class):
mock_oauth1session = MagicMock(spec=OAuth1Session)
oauth1session_class.return_value = mock_oauth1session
mock_oauth1session.post.side_effect = requests.exceptions.ConnectionError
t = twarc.Twarc("consumer_key", "consumer_secret", "access_token",
"access_token_secret", connection_errors=2,
validate_keys=False)
with pytest.raises(requests.exceptions.ConnectionError):
t.post("https://api.twitter.com")
assert 2 == mock_oauth1session.post.call_count
lockfile = os.path.join(args.archive_dir, '') + "lockfile"
if not os.path.exists(lockfile):
pid = os.getpid()
lockfile_handle = open(lockfile, "w")
lockfile_handle.write(str(pid))
lockfile_handle.close()
else:
old_pid = "unknown"
with open(lockfile, "r") as lockfile_handle:
old_pid = lockfile_handle.read()
sys.exit("Another twarc-archive.py process with pid " + old_pid + " is running. If the process is no longer active then it may have been interrupted. In that case remove the 'lockfile' in " + args.archive_dir + " and run the command again.")
logging.info("logging search for %s to %s", args.search, args.archive_dir)
t = twarc.Twarc(consumer_key=args.consumer_key,
consumer_secret=args.consumer_secret,
access_token=args.access_token,
access_token_secret=args.access_token_secret,
profile=args.profile,
config=args.config,
tweet_mode=args.tweet_mode)
last_archive = get_last_archive(args.archive_dir)
if last_archive:
last_id = json.loads(next(gzip.open(last_archive, 'rt')))['id_str']
else:
last_id = None
if args.twarc_command == "search":
tweets = t.search(args.search, since_id=last_id)
elif args.twarc_command == "timeline":
USER_OK = "USER_OK"
USER_DELETED = "USER_DELETED"
USER_PROTECTED = "USER_PROTECTED"
USER_SUSPENDED = "USER_SUSPENDED"
TWEET_OK = "TWEET_OK"
TWEET_DELETED = "TWEET_DELETED"
# You have been blocked by the user.
TWEET_BLOCKED = "TWEET_BLOCKED"
RETWEET_DELETED = "RETWEET_DELETED"
ORIGINAL_TWEET_DELETED = "ORIGINAL_TWEET_DELETED"
ORIGINAL_TWEET_BLOCKED = "ORIGINAL_TWEET_BLOCKED"
ORIGINAL_USER_DELETED = "ORIGINAL_USER_DELETED"
ORIGINAL_USER_PROTECTED = "ORIGINAL_USER_PROTECTED"
ORIGINAL_USER_SUSPENDED = "ORIGINAL_USER_SUSPENDED"
t = twarc.Twarc()
def main(files, enhance_tweet=False, print_results=True):
counts = collections.Counter()
for count, line in enumerate(fileinput.input(files=files)):
if count % 10000 == 0:
logging.info("processed {:,} tweets".format(count))
tweet = json.loads(line)
result = examine(tweet)
if enhance_tweet:
tweet['delete_reason'] = result
print(json.dumps(tweet))
else:
print(tweet_url(tweet), result)
counts[result] += 1
if print_results:
def run(self):
term = self.search['term']
lang = self.search['lang']
count = self.search['count']
t = twarc.Twarc(
consumer_key=config['TWITTER_CONSUMER_KEY'],
consumer_secret=config['TWITTER_CONSUMER_SECRET'],
access_token=self.search['token'],
access_token_secret=self.search['secret']
)
with self.output().open('w') as fh:
i = 0
for tweet in t.search(term):
i += 1
if i > count:
break
if i % 500 == 0:
self.update_job(
date_path=self.search['date_path'],
status="STARTED: %s - %s/%s" %
(self.task_family, i, count)
def _create_twarc(self):
self.twarc = Twarc(self.message["credentials"]["consumer_key"],
self.message["credentials"]["consumer_secret"],
self.message["credentials"]["access_token"],
self.message["credentials"]["access_token_secret"],
http_errors=self.http_errors,
connection_errors=self.connection_errors,
tweet_mode="extended")