Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def on_hit(self, hit, tweet_count):
# Cycle tweet id files
if tweet_count % self.max_per_file == 0:
if self.file:
self.file.close()
self.file = open(
os.path.join(self.dataset_path, 'tweets-{}.csv'.format(str(self.file_count).zfill(3))), 'w')
self.sheet = csv.writer(self.file)
self.sheet.writerow(json2csv.get_headings())
self.file_count += 1
# Write to tweet file
self.sheet.writerow(json2csv.get_row(json.loads(hit.tweet), excel=True))
def get_row(t, extra_fields=None, excel=False):
row = json2csv.get_row(t, excel=excel)
if extra_fields:
for field in extra_fields:
row.append(extra_field(t, field))
return row
def _row(self, item):
return twarc.json2csv.get_row(item, excel=True)