Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_writer_alias(self):
output = six.StringIO()
writer = csvkit.writer(output, encoding='utf-8')
self.assertEqual(writer._eight_bit, True)
writer.writerow(['a', 'b', 'c'])
writer.writerow(['1', '2', '3'])
writer.writerow(['4', '5', u'ʤ'])
written = six.StringIO(output.getvalue())
reader = csvkit.reader(written, encoding='utf-8')
self.assertEqual(next(reader), ['a', 'b', 'c'])
self.assertEqual(next(reader), ['1', '2', '3'])
self.assertEqual(next(reader), ['4', '5', u'ʤ'])
def clean(self):
"""
Cleans the provided source TSV file and writes it out in CSV format.
"""
# Create the output object
with open(self.csv_path, 'w') as csv_file:
# Create the CSV writer
csv_writer = csvkit.writer(csv_file)
# Write the headers
csv_writer.writerow(self.headers)
# Write out the rows
[csv_writer.writerow(row) for row in self._convert_tsv()]
# Log errors if there are any
if self.log_rows:
# Log to the terminal
if self.verbosity > 2:
msg = ' {} errors logged (not including empty lines)'
self.failure(msg.format(len(self.log_rows)))
# Log to the file
with open(self.error_log_path, 'w') as log_file:
log_writer = csvkit.writer(log_file, quoting=csv.QUOTE_ALL)
log_writer.writerow(['headers', 'fields', 'value'])
def export_sources_csv(cursor,output_filename):
cursor.row_factory = sqlite3.Row
sql="""
SELECT author,name,country,volume_number,volume_date,editor,edition_date,pages,shelf_number,URL,source_category,type,notes
FROM sources as s
WHERE s.slug in (SELECT distinct source from flow_joined) OR
s.slug in (SELECT distinct source from exchange_rates)"""
rows = cursor.execute(sql)
first = rows.next()
with open(output_filename,'w') as f:
dw = csvkit.writer(f)
dw.writerow(["bibliographic reference"] + first.keys())
dw.writerow(formatRef(first))
dw.writerows(formatRef(r) for r in rows)
return 0
return 1
with open('../../csv_data/exchange_rates.csv', 'r') as f:
with open('../../csv_data/new_exchange_rates.csv','w') as nf:
rates = csvkit.DictReader(f)
newRates = csvkit.DictWriter(nf, rates.fieldnames)
newRates.writeheader()
for rate in rates:
if rate['source'] in swapSources:
rate['source'] = swapSources[rate['source']]
else :
missingSources.add(rate['source'])
newRates.writerow(rate)
with open('missing_sources.list','w') as ms:
csvkit.writer(ms).writerows([_] for _ in missingSources)
#!/usr/bin/env python
# Remove newline chars from CSV "cells"
# Input is taken from stdin and output spit to stdout
import csvkit
import sys
reader = csvkit.reader(sys.stdin)
writer = csvkit.writer(sys.stdout)
for row in reader:
for i in range(0, len(row)):
if isinstance(row[i], str):
if "\n" in row[i]:
row[i] = row[i].replace("\n", '')
writer.writerow(row)
csv_writer = csvkit.writer(csv_file)
# Write the headers
csv_writer.writerow(self.headers)
# Write out the rows
[csv_writer.writerow(row) for row in self._convert_tsv()]
# Log errors if there are any
if self.log_rows:
# Log to the terminal
if self.verbosity > 2:
msg = ' {} errors logged (not including empty lines)'
self.failure(msg.format(len(self.log_rows)))
# Log to the file
with open(self.error_log_path, 'w') as log_file:
log_writer = csvkit.writer(log_file, quoting=csv.QUOTE_ALL)
log_writer.writerow(['headers', 'fields', 'value'])
log_writer.writerows(self.log_rows)
# Add counts to raw_file_record
self.raw_file.clean_columns_count = self.headers_count
self.raw_file.error_count = len(self.log_rows)
self.raw_file.clean_records_count = self.raw_file.download_records_count - self.raw_file.error_count
# Add file size to the raw_file_record
self.raw_file.download_file_size = os.path.getsize(self.tsv_path) or 0
self.raw_file.clean_file_size = os.path.getsize(self.csv_path) or 0
# Save it in case it crashes in the next step
self.raw_file.save()