Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_make_create_table_statement(self):
sql_table = sql.make_table(self.csv_table, 'csvsql')
statement = sql.make_create_table_statement(sql_table)
self.assertEqual(statement,
u"""CREATE TABLE test_table (
\ttext VARCHAR(17) NOT NULL,
try:
conn = engine.connect()
trans = conn.begin()
csv_table = table.Table.from_csv(
file.stream,
name=tablename,
snifflimit=snifflimit,
blanks_as_nulls=True,
infer_types=True,
no_header_row=False,
encoding=encoding
)
sql_table = sql.make_table(
csv_table,
tablename,
False, # self.args.no_constraints
db_schema, # self.args.db_schema
metadata
)
sql_table.create()
insert = sql_table.insert()
headers = csv_table.headers()
conn.execute(insert, [dict(zip(headers, row)) for row in csv_table.to_rows()])
trans.commit()
except Exception as e:
try:
conn = engine.connect()
trans = conn.begin()
csv_table = table.Table.from_csv(
file.stream,
name=tablename,
snifflimit=snifflimit,
blanks_as_nulls=True,
infer_types=True,
no_header_row=False,
encoding=encoding
)
sql_table = sql.make_table(
csv_table,
tablename,
False, #self.args.no_constraints
db_schema, #self.args.db_schema
metadata
)
sql_table.create()
insert = sql_table.insert()
headers = csv_table.headers()
conn.execute(insert, [dict(zip(headers, row)) for row in csv_table.to_rows()])
trans.commit()
except Exception as e:
def _get_column_types(self):
self.tracker.forward('Inferring datatype of columns')
# Load the csv and use csvkit's sql.make_table utility
# to infer the datatypes of the columns.
with open(self.path,'r') as f:
csv_table = table.Table.from_csv(f, delimiter=',')
sql_table = sql.make_table(csv_table)
for i, column in enumerate(sql_table.columns):
# Clean the type and name values
raw_type = str(column.type)
clean_type = re.sub(re.compile(r'\(\w+\)'), '', raw_type)
# Temporary fix for issue #19
if raw_type == 'BOOLEAN':
raw_type = 'VARCHAR(10)'
if raw_type == 'DATETIME':
# Dumb guess at the maximum length of a datetime field. Find a
# better way!
raw_type = 'VARCHAR(100)'
parsed_length = re.search(re.compile(r'\((\w+)\)'), raw_type)
if parsed_length:
attribute_type=column.type.__name__,
display_order=column.order,
visible=is_visible)
except:
data_table.delete() # Deleting DataTable also deletes related DataTableAttribute objects
err_msg = 'Failed to convert csv file to table. Error: %s' % str(sys.exc_info()[0])
LOGGER.error(err_msg)
return None, err_msg
msg('process_csv_file 3')
# -----------------------------------------------------
# Generate SQL to create table from csv file
# -----------------------------------------------------
try:
sql_table = sql.make_table(csv_table, table_name)
create_table_sql = sql.make_create_table_statement(sql_table, dialect="postgresql")
data_table.create_table_sql = create_table_sql
data_table.save()
except:
data_table.delete()
err_msg = 'Generate SQL to create table from csv file. Error: %s' % str(sys.exc_info()[0])
LOGGER.error(err_msg)
return None, err_msg
msg('process_csv_file 4')
# -----------------------------------------------------
# Execute the SQL and Create the Table (No data is loaded)
# -----------------------------------------------------
conn = psycopg2.connect(get_datastore_connection_string(is_dataverse_db=is_dataverse_db))
def create_table(self, cursor, schema, table, virtual_table):
sql_table = csv_sql.make_table(virtual_table, db_schema=schema)
create_table_sql = csv_sql.make_create_table_statement(
sql_table, dialect='postgresql'
)
logger.info("Creating table {}.{}".format(schema, table))
cursor.execute(create_table_sql)