Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_utf16_little(self):
with open('examples/test_utf16_little.csv') as f:
reader = unicsv.UnicodeCSVReader(f, encoding='utf-16')
self.assertEqual(reader.next(), ['a', 'b', 'c'])
self.assertEqual(reader.next(), ['1', '2', '3'])
self.assertEqual(reader.next(), ['4', '5', u'ʤ'])
def test_utf16_big(self):
with open('examples/test_utf16_big.csv') as f:
reader = unicsv.UnicodeCSVReader(f, encoding='utf-16')
self.assertEqual(reader.next(), ['a', 'b', 'c'])
self.assertEqual(reader.next(), ['1', '2', '3'])
self.assertEqual(reader.next(), ['4', '5', u'ʤ'])
import sys
from csvkit.unicsv import UnicodeCSVReader
import config
import utils
if len(sys.argv) < 2:
sys.exit('You must provide the filename of a CSV as an argument to this script.')
FILENAME = sys.argv[1]
collection = utils.get_geography2000_collection()
with open(FILENAME) as f:
rows = UnicodeCSVReader(f)
headers = rows.next()
inserts = 0
updates = 0
row_count = 0
for row in rows:
row_count += 1
geography = {
#'sumlev': '',
#'geoid': '',
#'metadata': {},
#'xrefs': [],
#'data': {}
#'xwalk': {}
import config
import utils
if len(sys.argv) < 2:
sys.exit('You must provide the filename of a CSV as an argument to this script.')
FILENAME = sys.argv[1]
YEAR = '2010'
connection = Connection()
db = connection[config.CENSUS_DB]
collection = db[config.GEOGRAPHIES_COLLECTION]
with open(FILENAME) as f:
rows = UnicodeCSVReader(f)
headers = rows.next()
inserts = 0
row_count = 0
for row in rows:
row_count += 1
row_dict = dict(zip(headers, row))
xref = utils.xref_from_row_dict(row_dict)
geography = utils.find_geography_by_xref(collection, xref)
if not geography:
continue
import sys
from csvkit.unicsv import UnicodeCSVReader
import config
import utils
if len(sys.argv) < 2:
sys.exit('You must provide the filename of a CSV as an argument to this script.')
FILENAME = sys.argv[1]
collection = utils.get_geography2000_collection()
with open(FILENAME) as f:
rows = UnicodeCSVReader(f)
headers = rows.next()
inserts = 0
updates = 0
row_count = 0
for row in rows:
row_count += 1
geography = {
#'sumlev': '',
#'geoid': '',
#'metadata': {},
#'xrefs': [],
#'data': {}
#'xwalk': {}
from pymongo import objectid
import config
import utils
if len(sys.argv) < 2:
sys.exit('You must provide the filename of a CSV as an argument to this script.')
FILENAME = sys.argv[1]
YEAR = '2010'
collection = utils.get_geography_collection()
with open(FILENAME) as f:
rows = UnicodeCSVReader(f)
headers = rows.next()
updates = 0
row_count = 0
for row in rows:
row_count += 1
row_dict = dict(zip(headers, row))
xref = utils.xref_from_row_dict(row_dict)
geography = utils.find_geography_by_xref(collection, xref, fields=['data'])
if not geography:
continue
from pymongo import objectid
import config
import utils
if len(sys.argv) < 2:
sys.exit('You must provide the filename of a CSV as an argument to this script.')
FILENAME = sys.argv[1]
YEAR = '2010'
collection = utils.get_geography_collection()
with open(FILENAME) as f:
rows = UnicodeCSVReader(f)
headers = rows.next()
updates = 0
row_count = 0
for row in rows:
row_count += 1
row_dict = dict(zip(headers, row))
xref = utils.xref_from_row_dict(row_dict)
geography = utils.find_geography_by_xref(collection, xref, fields=['data'])
if not geography:
continue
# If reading from AWS...
if self.s3_key:
# ...dump the contents into s.
self.s3_key.get_contents_to_file(s)
# If reading locally...
else:
# ... read the file out of DATA_DIR.
with open(self.fpath, 'r') as f:
s.write(f.read())
# Go to start of file.
s.seek(0)
# Find out what types of columns we'll need to store the data.
with gzip.GzipFile(fileobj=s, mode='rb') as f:
reader = UnicodeCSVReader(f)
header = map(slugify, reader.next())
col_types = [] # Will be list of pairs: (column_type, is_nullable)
try: # Were data_types specified at init?
types = getattr(self, 'data_types')
col_map = {c['field_name']: c['data_type'] for c in types}
for col in header:
t = col_map[col]
col_types.append((COL_TYPES[t], True)) # always nullable
except AttributeError: # Try to infer column types.
for col in range(len(header)):
col_types.append(iter_column(col, f))
# Create rows that will be used to keep track of the version of the source dataset
#!/usr/bin/env python
"""
Python2-specific classes.
"""
import six
from csvkit import unicsv
class CSVKitReader(unicsv.UnicodeCSVReader):
"""
A unicode-aware CSV reader.
"""
pass
class CSVKitWriter(unicsv.UnicodeCSVWriter):
"""
A unicode-aware CSV writer.
"""
def __init__(self, f, encoding='utf-8', line_numbers=False, **kwargs):
self.row_count = 0
self.line_numbers = line_numbers
if 'lineterminator' not in kwargs:
kwargs['lineterminator'] = '\n'
if f:
if allowed_file(f.filename):
inp = StringIO(f.read())
if sys.getsizeof(inp.getvalue()) <= MAX_CONTENT_LENGTH:
inp.seek(0)
file_format = convert.guess_format(f.filename)
try:
converted = convert.convert(inp, file_format)
except UnicodeDecodeError:
context['errors'] = ['We had a problem with reading your file. \
This could have to do with the file encoding or format']
converted = None
f.seek(0)
if converted:
outp = StringIO(converted)
reader = UnicodeCSVReader(outp)
session['header_row'] = reader.next()
rows = []
columns = [[] for c in session['header_row']]
column_ids = range(len(session['header_row']))
for row in range(100):
try:
rows.append(reader.next())
except StopIteration:
break
for i, row in enumerate(rows):
for j,d in enumerate(row):
columns[j].append(row[column_ids[j]])
sample_data = []
guesses = {}
for index, header_val in enumerate(session['header_row']):
guesses[index] = guess_geotype(header_val, columns[index])