Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def import_file(self, table_name, file_path, file_format='CSV',
delimiter=',', header=True, encoding='ISO-8859-1',
quote_character='"'):
header_option = 'HEADER' if header else ''
words = table_name.split('.')
for word in words[:-1]:
self._check_for_injections(word)
self._validate_table_name(words[-1])
self._check_for_injections(file_format)
query = 'COPY %s FROM %s WITH %s %s DELIMITER %s ENCODING %s QUOTE %s;'
params = (AsIs(table_name), file_path, AsIs(file_format),
AsIs(header_option), delimiter, encoding, quote_character)
try:
self.execute_sql(query, params)
except Exception as e:
self.execute_sql('DROP TABLE IF EXISTS %s', (AsIs(table_name),))
raise ImportError(e)
for (fid, attrs, wkt) in any_features_list:
for idx, l_attrs in enumerate(attrs):
if l_attrs:
attrs[idx] = [i if i else None for i in l_attrs]
if attrs[idx] == [None]:
attrs[idx] = None
else:
attrs[idx] = [a for a in attrs[idx] if a]
data.append(tuple((attrs, wkt)))
args_str = ','.join(
[rmv_parenthesis(cur.mogrify("%s,ST_GeomFromText(%s,%s))", (tuple(attrs), wkt, AsIs(crs_id)))) for
(attrs, wkt) in tuple(data)])
ins_str = cur.mogrify("""INSERT INTO %s.%s VALUES """, (AsIs(schema), AsIs(table_name)))
cur.execute(ins_str + args_str)
con.commit()
con.close()
print "success!"
uri = QgsDataSourceURI()
# set host name, port, database name, username and password
uri.setConnection(host, port, dbname, user, password)
# set database schema, table name, geometry column and optionally
uri.setDataSource(schema, table_name, "geom")
return QgsVectorLayer(uri.uri(), table_name, "postgres")
except psycopg2.DatabaseError, e:
return e
def adapt_numpy_int64(numpy_int64):
return AsIs(numpy_int64)
register_adapter(numpy.int64, adapt_numpy_int64)
def rename_repo(self, repo, new_name):
self._check_for_injections(repo)
self._check_for_injections(new_name)
query = 'ALTER SCHEMA %s RENAME TO %s'
params = (AsIs(repo), AsIs(new_name))
res = self.execute_sql(query, params)
return res['status']
def delete_view(self, repo, view, force=False):
self._check_for_injections(repo)
self._validate_table_name(view)
force_param = 'RESTRICT'
if force:
force_param = 'CASCADE'
query = ('DROP VIEW %s.%s.%s %s')
params = (AsIs(self.repo_base), AsIs(repo), AsIs(view),
AsIs(force_param))
res = self.execute_sql(query, params)
return res['status']
def get_equivalence_size(self, eid):
try:
query = "SELECT COUNT (*) FROM %s.%s WHERE EID = %s;"
params = [AsIs(self.repo), AsIs(self.name), AsIs(eid)]
result = self.backend.execute_sql(query, params)["tuples"]
if len(result) > 0:
return int(result[0][0])
return 0
except Exception as e:
raise e
def describe_table(self, repo, table, detail=False):
query = ("SELECT %s "
"FROM information_schema.columns "
"WHERE table_schema = %s and table_name = %s;")
params = None
if detail:
params = (AsIs('*'), repo, table)
else:
params = (AsIs('column_name, data_type'), repo, table)
res = self.execute_sql(query, params)
return res['tuples']
def find_licenses(self):
'''
find all licenses
'''
query = (
'SELECT license_id, license_name, pii_def, '
'pii_anonymized, pii_removed FROM %s.%s;')
params = (AsIs(settings.LICENSE_SCHEMA), AsIs(settings.LICENSE_TABLE))
res = self.execute_sql(query, params)
return res['tuples']
# remove collaborator access to the database
if revoke_collaborators:
all_users = self.list_all_users()
for user in all_users:
query = "REVOKE ALL ON DATABASE %s FROM %s;"
params = (AsIs(database), AsIs(user))
self.execute_sql(query, params)
# Make sure to close all extant connections to this database or the
# drop will fail.
_close_all_connections(database)
# drop database
query = 'DROP DATABASE %s;'
params = (AsIs(database),)
try:
return self.execute_sql(query, params)
except psycopg2.ProgrammingError as e:
print(e)
print('this probably happened because the postgres role'
'exists, but a database of the same name does not.')