Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
filename = "{}{}{}.{}".format(
self.dirname,
self.options.output_prefix,
word_count,
self.options.output_extension
)
n_word_file = io.open(filename, 'a', encoding='utf-8')
n_word_file.write(tweet)
n_word_file.write("\n")
if self.options.verbose:
for word in self.words:
tweet = (colorama.Fore.CYAN + word).join(tweet.split(word))
tweet = (word + colorama.Fore.RESET).join(tweet.split(word))
pprint(word_count, tweet)
def summary(result):
if not self.options.metadata_to_dict:
if self.options.verbose:
pprint(Fore.CYAN + result['title'] + Fore.RESET)
pprint(Fore.CYAN + Style.DIM + result['written_at'] + Style.RESET_ALL + Fore.RESET)
pprint(result['body'])
writer.write("@title:" + result['title'])
writer.write("@written_at:" + result['written_at'])
writer.write("@body:" + result['body'])
else:
if self.options.verbose:
pprint(result)
writer.write(result)
def summary(result):
if not self.options.metadata_to_dict:
if self.options.verbose:
pprint(Fore.CYAN + result['title'] + Fore.RESET)
pprint(Fore.CYAN + Style.DIM + result['written_at'] + Style.RESET_ALL + Fore.RESET)
pprint(result['body'])
writer.write("@title:" + result['title'])
writer.write("@written_at:" + result['written_at'])
writer.write("@body:" + result['body'])
else:
if self.options.verbose:
pprint(result)
writer.write(result)
def summary(result):
for content in result:
if not self.options.metadata_to_dict:
if self.options.verbose:
pprint(Fore.CYAN + content['date'] + Fore.RESET)
pprint(Fore.CYAN + Style.DIM + content['title'] + Style.RESET_ALL + Fore.RESET)
pprint(Fore.CYAN + Style.DIM * 2 + content['traffic'] + Style.RESET_ALL + Fore.RESET)
writer.write("@date:" + content['date'])
writer.write("@title:" + content['title'])
writer.write("@traffic:" + content['traffic'])
else:
output = '\t'.join([content['date'], content['title'], content['traffic']])
if self.options.verbose:
pprint(output)
writer.write(output)
def save_and_print(self):
"""collect current trending words and save or print"""
counts, keywords = get_current_trend()
if self.options.display_rank:
for count, keyword in zip(counts, keywords):
pair = "{}.{}".format(count, keyword)
self.writer.write(pair)
if self.options.verbose:
pprint(pair)
else:
for keyword in keywords:
self.writer.write(keyword)
if self.options.verbose:
pprint(keyword)
def stream(self):
try:
if self.is_async:
self._thread = PropagatingThread(target=self.job)
self._thread.start()
self._thread.join()
else:
self.job()
except RecursionError:
return False
except KeyboardInterrupt:
pprint("User has interrupted.")
return False
except:
if self.recursion < self.retry:
pprint("Error has raised but continue to stream.")
self.recursion += 1
self.stream()
else:
raise
def get_trend(self, date):
try:
# Site's anti-bot policy may block crawling & you can consider gentle crawling
time.sleep(self.options.interval)
response = self.request_trend(date)
pprint(response.status_code)
response.raise_for_status()
trend = self.parse_trend(response.content)
return trend
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout):
# if timeout occurs, retry
return self.get_trend(date)
except requests.exceptions.HTTPError:
return None
def summary(result):
for content in result:
if not self.options.metadata_to_dict:
if self.options.verbose:
pprint(Fore.CYAN + content['date'] + Fore.RESET)
pprint(Fore.CYAN + Style.DIM + content['title'] + Style.RESET_ALL + Fore.RESET)
pprint(Fore.CYAN + Style.DIM * 2 + content['traffic'] + Style.RESET_ALL + Fore.RESET)
writer.write("@date:" + content['date'])
writer.write("@title:" + content['title'])
writer.write("@traffic:" + content['traffic'])
else:
output = '\t'.join([content['date'], content['title'], content['traffic']])
if self.options.verbose:
pprint(output)
writer.write(output)
def get_trend(self):
_, self.trend = get_current_trend()
if self.options.verbose:
pprint(self.trend)