Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_extra_header(self):
with open('examples/test_extra_header.csv', 'r') as f:
t = table.Table.from_csv(f)
self.assertEqual(type(t), table.Table)
self.assertEqual(type(t[0]), table.Column)
self.assertEqual(len(t), 4)
self.assertEqual(t[0], [1])
self.assertEqual(t[1], [2])
self.assertEqual(t[2], [3])
self.assertEqual(t[3], [None])
def test_from_csv_dev_null(self):
with open('/dev/null', 'r') as f:
table.Table.from_csv(f)
def test_from_csv(self):
with open('examples/testfixed_converted.csv', 'r') as f:
t = table.Table.from_csv(f)
self.assertEqual(type(t), table.Table)
self.assertEqual(type(t[0]), table.Column)
self.assertEqual(len(t), 8)
self.assertEqual(t[2][0], 40)
self.assertEqual(type(t[2][0]), int)
self.assertEqual(t[3][0], True)
self.assertEqual(type(t[3][0]), bool)
def _get_column_types(self):
self.tracker.forward('Inferring datatype of columns')
# Load the csv and use csvkit's sql.make_table utility
# to infer the datatypes of the columns.
with open(self.path,'r') as f:
csv_table = table.Table.from_csv(f, delimiter=',')
sql_table = sql.make_table(csv_table)
for i, column in enumerate(sql_table.columns):
# Clean the type and name values
raw_type = str(column.type)
clean_type = re.sub(re.compile(r'\(\w+\)'), '', raw_type)
# Temporary fix for issue #19
if raw_type == 'BOOLEAN':
raw_type = 'VARCHAR(10)'
if raw_type == 'DATETIME':
# Dumb guess at the maximum length of a datetime field. Find a
# better way!
raw_type = 'VARCHAR(100)'
def csv2sql(file=None, db_schema=None, tablename=None, encoding='utf-8', snifflimit=512*1024):
try:
conn = engine.connect()
trans = conn.begin()
csv_table = table.Table.from_csv(
file.stream,
name=tablename,
snifflimit=snifflimit,
blanks_as_nulls=True,
infer_types=True,
no_header_row=False,
encoding=encoding
)
sql_table = sql.make_table(
csv_table,
tablename,
False, # self.args.no_constraints
db_schema, # self.args.db_schema
metadata
)