Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_extended_compat():
t_compat = twarc.Twarc(tweet_mode="compat")
assert 'full_text' in next(T.search('obama'))
assert 'text' in next(t_compat.search("obama"))
assert 'full_text' in next(T.timeline(screen_name="BarackObama"))
assert 'text' in next(t_compat.timeline(screen_name="BarackObama"))
import twarc
"""
You will need to have these environment variables set to run these tests:
* CONSUMER_KEY
* CONSUMER_SECRET
* ACCESS_TOKEN
* ACCESS_TOKEN_SECRET
"""
logging.basicConfig(filename="test.log", level=logging.INFO)
T = twarc.Twarc()
def test_version():
import setup
assert setup.__version__ == twarc.__version__
def test_search():
count = 0
for tweet in T.search('obama'):
assert tweet['id_str']
count += 1
if count == 10:
break
assert count == 10
def test_connection_error_get(oauth1session_class):
mock_oauth1session = MagicMock(spec=OAuth1Session)
oauth1session_class.return_value = mock_oauth1session
mock_oauth1session.get.side_effect = requests.exceptions.ConnectionError
t = twarc.Twarc("consumer_key", "consumer_secret", "access_token",
"access_token_secret", connection_errors=3,
validate_keys=False)
with pytest.raises(requests.exceptions.ConnectionError):
t.get("https://api.twitter.com")
assert 3 == mock_oauth1session.get.call_count
def test_connection_error_post(oauth1session_class):
mock_oauth1session = MagicMock(spec=OAuth1Session)
oauth1session_class.return_value = mock_oauth1session
mock_oauth1session.post.side_effect = requests.exceptions.ConnectionError
t = twarc.Twarc("consumer_key", "consumer_secret", "access_token",
"access_token_secret", connection_errors=2,
validate_keys=False)
with pytest.raises(requests.exceptions.ConnectionError):
t.post("https://api.twitter.com")
assert 2 == mock_oauth1session.post.call_count
def test_version():
import setup
assert setup.__version__ == twarc.__version__
lockfile = os.path.join(args.archive_dir, '') + "lockfile"
if not os.path.exists(lockfile):
pid = os.getpid()
lockfile_handle = open(lockfile, "w")
lockfile_handle.write(str(pid))
lockfile_handle.close()
else:
old_pid = "unknown"
with open(lockfile, "r") as lockfile_handle:
old_pid = lockfile_handle.read()
sys.exit("Another twarc-archive.py process with pid " + old_pid + " is running. If the process is no longer active then it may have been interrupted. In that case remove the 'lockfile' in " + args.archive_dir + " and run the command again.")
logging.info("logging search for %s to %s", args.search, args.archive_dir)
t = twarc.Twarc(consumer_key=args.consumer_key,
consumer_secret=args.consumer_secret,
access_token=args.access_token,
access_token_secret=args.access_token_secret,
profile=args.profile,
config=args.config,
tweet_mode=args.tweet_mode)
last_archive = get_last_archive(args.archive_dir)
if last_archive:
last_id = json.loads(next(gzip.open(last_archive, 'rt')))['id_str']
else:
last_id = None
if args.twarc_command == "search":
tweets = t.search(args.search, since_id=last_id)
elif args.twarc_command == "timeline":
def on_hit(self, hit, tweet_count):
# Cycle tweet id files
if tweet_count % self.max_per_file == 0:
if self.file:
self.file.close()
self.file = open(
os.path.join(self.dataset_path, 'tweets-{}.csv'.format(str(self.file_count).zfill(3))), 'w')
self.sheet = csv.writer(self.file)
self.sheet.writerow(json2csv.get_headings())
self.file_count += 1
# Write to tweet file
self.sheet.writerow(json2csv.get_row(json.loads(hit.tweet), excel=True))
parser.add_argument('files', metavar='FILE', nargs='*', default=['-'], help='files to read, if empty, stdin is used')
args = parser.parse_args()
flags = 0
if args.ignore:
flags = re.IGNORECASE
try:
regex = re.compile(args.regex, flags)
except Exception as e:
sys.exit("error: regex failed to compile: {}".format(e))
for line in fileinput.input(files=args.files):
tweet = json.loads(line)
text = json2csv.text(tweet)
if regex.search(text):
print(line, end='')
def on_hit(self, hit, tweet_count):
# Cycle tweet id files
if tweet_count % self.max_per_file == 0:
if self.file:
self.file.close()
self.file = open(
os.path.join(self.dataset_path, 'tweets-{}.csv'.format(str(self.file_count).zfill(3))), 'w')
self.sheet = csv.writer(self.file)
self.sheet.writerow(json2csv.get_headings())
self.file_count += 1
# Write to tweet file
self.sheet.writerow(json2csv.get_row(json.loads(hit.tweet), excel=True))
args = parser.parse_args()
command = args.command
query = args.query or ""
logging.basicConfig(
filename=args.log,
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s"
)
# catch ctrl-c so users don't see a stack trace
signal.signal(signal.SIGINT, lambda signal, frame: sys.exit(0))
if command == "version":
print("twarc v%s" % __version__)
sys.exit()
elif command == "help" or not command:
parser.print_help()
print("\nPlease use one of the following commands:\n")
for cmd in commands:
print(" - %s" % cmd)
print("\nFor example:\n\n twarc search blacklivesmatter")
sys.exit(1)
# Don't validate the keys if the command is "configure"
if command == "configure" or args.skip_key_validation:
validate_keys = False
else:
validate_keys = True