Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"{fts_pk} in (select rowid from {fts_table} where {fts_table} match :search)".format(
fts_table=escape_sqlite(fts_table), fts_pk=escape_sqlite(fts_pk)
)
)
extra_human_descriptions.append('search matches "{}"'.format(search))
params["search"] = search
else:
# More complex: search against specific columns
for i, (key, search_text) in enumerate(search_args.items()):
search_col = key.split("_search_", 1)[1]
if search_col not in await db.table_columns(fts_table):
raise DatasetteError("Cannot search by that column", status=400)
where_clauses.append(
"rowid in (select rowid from {fts_table} where {search_col} match :search_{i})".format(
fts_table=escape_sqlite(fts_table),
search_col=escape_sqlite(search_col),
i=i,
)
)
extra_human_descriptions.append(
'search column "{}" matches "{}"'.format(
search_col, search_text
)
)
params["search_{}".format(i)] = search_text
sortable_columns = set()
sortable_columns = await self.sortable_columns_for_table(
database, table, use_rowid
)
def inspect_tables(conn, database_metadata):
" List tables and their row counts, excluding uninteresting tables. "
tables = {}
table_names = [
r["name"]
for r in conn.execute('select * from sqlite_master where type="table"')
]
for table in table_names:
table_metadata = database_metadata.get("tables", {}).get(table, {})
try:
count = conn.execute(
"select count(*) from {}".format(escape_sqlite(table))
).fetchone()[0]
except sqlite3.OperationalError:
# This can happen when running against a FTS virtual table
# e.g. "select count(*) from some_fts;"
count = 0
column_names = table_columns(conn, table)
tables[table] = {
"name": table,
"columns": column_names,
"primary_keys": detect_primary_keys(conn, table),
"count": count,
"hidden": table_metadata.get("hidden") or False,
"fts_table": detect_fts(conn, table),
}
)
)
params['p{}'.format(len(params))] = components[0]
else:
# Apply the tie-breaker based on primary keys
if len(components) == len(pks):
param_len = len(params)
next_by_pk_clauses.append(compound_keys_after_sql(pks, param_len))
for i, pk_value in enumerate(components):
params['p{}'.format(param_len + i)] = pk_value
# Now add the sort SQL, which may incorporate next_by_pk_clauses
if sort or sort_desc:
where_clauses.append(
'({column} {op} :p{p} or ({column} = :p{p} and {next_clauses}))'.format(
column=escape_sqlite(sort or sort_desc),
op='>' if sort else '<',
p=len(params),
next_clauses=' and '.join(next_by_pk_clauses),
)
)
params['p{}'.format(len(params))] = sort_value
else:
where_clauses.extend(next_by_pk_clauses)
where_clause = ''
if where_clauses:
where_clause = 'where {} '.format(' and '.join(where_clauses))
if order_by:
order_by = 'order by {} '.format(order_by)
hash,
canned_query["sql"],
metadata=canned_query,
editable=False,
canned_query=table,
)
db = self.ds.databases[database]
is_view = bool(await db.get_view_definition(table))
table_exists = bool(await db.table_exists(table))
if not is_view and not table_exists:
raise NotFound("Table not found: {}".format(table))
pks = await db.primary_keys(table)
table_columns = await db.table_columns(table)
select_columns = ", ".join(escape_sqlite(t) for t in table_columns)
use_rowid = not pks and not is_view
if use_rowid:
select = "rowid, {}".format(select_columns)
order_by = "rowid"
order_by_pks = "rowid"
else:
select = select_columns
order_by_pks = ", ".join([escape_sqlite(pk) for pk in pks])
order_by = order_by_pks
if is_view:
order_by = ""
# Ensure we don't drop anything with an empty value e.g. ?name__exact=
args = RequestParameters(
table_rows_count = table_info['count']
sortable_columns = self.sortable_columns_for_table(name, table, use_rowid)
# Allow for custom sort order
sort = special_args.get('_sort')
if sort:
if sort not in sortable_columns:
raise DatasetteError('Cannot sort table by {}'.format(sort))
order_by = escape_sqlite(sort)
sort_desc = special_args.get('_sort_desc')
if sort_desc:
if sort_desc not in sortable_columns:
raise DatasetteError('Cannot sort table by {}'.format(sort_desc))
if sort:
raise DatasetteError('Cannot use _sort and _sort_desc at the same time')
order_by = '{} desc'.format(escape_sqlite(sort_desc))
count_sql = 'select count(*) from {table_name} {where}'.format(
table_name=escape_sqlite(table),
where=(
'where {} '.format(' and '.join(where_clauses))
) if where_clauses else '',
)
# _group_count=col1&_group_count=col2
group_count = special_args_lists.get('_group_count') or []
if group_count:
sql = 'select {group_cols}, count(*) as "count" from {table_name} {where} group by {group_cols} order by "count" desc limit 100'.format(
group_cols=', '.join('"{}"'.format(group_count_col) for group_count_col in group_count),
table_name=escape_sqlite(table),
where=(
'where {} '.format(' and '.join(where_clauses))
async def foreign_key_tables(self, database, table, pk_values):
if len(pk_values) != 1:
return []
db = self.ds.databases[database]
all_foreign_keys = await db.get_all_foreign_keys()
foreign_keys = all_foreign_keys[table]["incoming"]
if len(foreign_keys) == 0:
return []
sql = "select " + ", ".join(
[
"(select count(*) from {table} where {column}=:id)".format(
table=escape_sqlite(fk["other_table"]),
column=escape_sqlite(fk["other_column"]),
)
for fk in foreign_keys
]
)
try:
rows = list(await self.ds.execute(database, sql, {"id": pk_values[0]}))
except sqlite3.OperationalError:
# Almost certainly hit the timeout
return []
foreign_table_counts = dict(
zip(
[(fk["other_table"], fk["other_column"]) for fk in foreign_keys],
list(rows[0]),
)
)
foreign_key
for foreign_key in foreign_keys
if foreign_key["column"] == column
][0]
except IndexError:
return {}
label_column = await db.label_column_for_table(fk["other_table"])
if not label_column:
return {(fk["column"], value): str(value) for value in values}
labeled_fks = {}
sql = """
select {other_column}, {label_column}
from {other_table}
where {other_column} in ({placeholders})
""".format(
other_column=escape_sqlite(fk["other_column"]),
label_column=escape_sqlite(label_column),
other_table=escape_sqlite(fk["other_table"]),
placeholders=", ".join(["?"] * len(set(values))),
)
try:
results = await self.execute(database, sql, list(set(values)))
except QueryInterrupted:
pass
else:
for id, value in results:
labeled_fks[(fk["column"], id)] = value
return labeled_fks
# Now add the sort SQL, which may incorporate next_by_pk_clauses
if sort or sort_desc:
if sort_value is None:
if sort_desc:
# Just items where column is null ordered by pk
where_clauses.append(
"({column} is null and {next_clauses})".format(
column=escape_sqlite(sort_desc),
next_clauses=" and ".join(next_by_pk_clauses),
)
)
else:
where_clauses.append(
"({column} is not null or ({column} is null and {next_clauses}))".format(
column=escape_sqlite(sort),
next_clauses=" and ".join(next_by_pk_clauses),
)
)
else:
where_clauses.append(
"({column} {op} :p{p}{extra_desc_only} or ({column} = :p{p} and {next_clauses}))".format(
column=escape_sqlite(sort or sort_desc),
op=">" if sort else "<",
p=len(params),
extra_desc_only=""
if sort
else " or {column2} is null".format(
column2=escape_sqlite(sort or sort_desc)
),
next_clauses=" and ".join(next_by_pk_clauses),
)
)
search_description = 'search matches "{}"'.format(search)
params['search'] = search
table_rows_count = None
sortable_columns = set()
if not is_view:
table_rows_count = table_info['count']
sortable_columns = self.sortable_columns_for_table(name, table, use_rowid)
# Allow for custom sort order
sort = special_args.get('_sort')
if sort:
if sort not in sortable_columns:
raise DatasetteError('Cannot sort table by {}'.format(sort))
order_by = escape_sqlite(sort)
sort_desc = special_args.get('_sort_desc')
if sort_desc:
if sort_desc not in sortable_columns:
raise DatasetteError('Cannot sort table by {}'.format(sort_desc))
if sort:
raise DatasetteError('Cannot use _sort and _sort_desc at the same time')
order_by = '{} desc'.format(escape_sqlite(sort_desc))
count_sql = 'select count(*) from {table_name} {where}'.format(
table_name=escape_sqlite(table),
where=(
'where {} '.format(' and '.join(where_clauses))
) if where_clauses else '',
)
# _group_count=col1&_group_count=col2
break
m.update(data)
# List tables and their row counts
tables = {}
views = []
with sqlite3.connect('file:{}?immutable=1'.format(path), uri=True) as conn:
self.prepare_connection(conn)
table_names = [
r['name']
for r in conn.execute('select * from sqlite_master where type="table"')
]
views = [v[0] for v in conn.execute('select name from sqlite_master where type = "view"')]
for table in table_names:
try:
count = conn.execute(
'select count(*) from {}'.format(escape_sqlite(table))
).fetchone()[0]
except sqlite3.OperationalError:
# This can happen when running against a FTS virtual tables
# e.g. "select count(*) from some_fts;"
count = 0
# Figure out primary keys
table_info_rows = [
row for row in conn.execute(
'PRAGMA table_info("{}")'.format(table)
).fetchall()
if row[-1]
]
table_info_rows.sort(key=lambda row: row[-1])
primary_keys = [str(r[1]) for r in table_info_rows]
label_column = None
# If table has two columns, one of which is ID, then label_column is the other one