Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
start = time.time()
status_code = 200
templates = []
try:
response_or_template_contexts = await self.data(
request, name, hash, **kwargs
)
if isinstance(response_or_template_contexts, response.HTTPResponse):
return response_or_template_contexts
else:
data, extra_template_data, templates = response_or_template_contexts
except (sqlite3.OperationalError, InvalidSql, DatasetteError) as e:
raise DatasetteError(str(e), title='Invalid SQL', status=400)
except (sqlite3.OperationalError) as e:
raise DatasetteError(str(e))
except DatasetteError:
raise
end = time.time()
data['query_ms'] = (end - start) * 1000
for key in ('source', 'source_url', 'license', 'license_url'):
value = self.ds.metadata.get(key)
if value:
data[key] = value
if as_json:
# Special case for .jsono extension - redirect to _shape=objects
if as_json == '.jsono':
return self.redirect(
request,
path_with_added_args(
request,
{'_shape': 'objects'},
path=request.path.rsplit('.jsono', 1)[0] + '.json'
except KeyError:
as_json = False
extra_template_data = {}
start = time.time()
status_code = 200
templates = []
try:
response_or_template_contexts = await self.data(
request, name, hash, **kwargs
)
if isinstance(response_or_template_contexts, response.HTTPResponse):
return response_or_template_contexts
else:
data, extra_template_data, templates = response_or_template_contexts
except (sqlite3.OperationalError, InvalidSql, DatasetteError) as e:
raise DatasetteError(str(e), title='Invalid SQL', status=400)
except (sqlite3.OperationalError) as e:
raise DatasetteError(str(e))
except DatasetteError:
raise
end = time.time()
data['query_ms'] = (end - start) * 1000
for key in ('source', 'source_url', 'license', 'license_url'):
value = self.ds.metadata.get(key)
if value:
data[key] = value
if as_json:
# Special case for .jsono extension - redirect to _shape=objects
if as_json == '.jsono':
return self.redirect(
request,
path_with_added_args(
if not is_view:
table_rows_count = table_info['count']
sortable_columns = self.sortable_columns_for_table(name, table, use_rowid)
# Allow for custom sort order
sort = special_args.get('_sort')
if sort:
if sort not in sortable_columns:
raise DatasetteError('Cannot sort table by {}'.format(sort))
order_by = escape_sqlite(sort)
sort_desc = special_args.get('_sort_desc')
if sort_desc:
if sort_desc not in sortable_columns:
raise DatasetteError('Cannot sort table by {}'.format(sort_desc))
if sort:
raise DatasetteError('Cannot use _sort and _sort_desc at the same time')
order_by = '{} desc'.format(escape_sqlite(sort_desc))
count_sql = 'select count(*) from {table_name} {where}'.format(
table_name=escape_sqlite(table),
where=(
'where {} '.format(' and '.join(where_clauses))
) if where_clauses else '',
)
# _group_count=col1&_group_count=col2
group_count = special_args_lists.get('_group_count') or []
if group_count:
sql = 'select {group_cols}, count(*) as "count" from {table_name} {where} group by {group_cols} order by "count" desc limit 100'.format(
group_cols=', '.join('"{}"'.format(group_count_col) for group_count_col in group_count),
table_name=escape_sqlite(table),
where=(
table_rows_count = None
sortable_columns = set()
if not is_view:
table_rows_count = table_info['count']
sortable_columns = self.sortable_columns_for_table(name, table, use_rowid)
# Allow for custom sort order
sort = special_args.get('_sort')
if sort:
if sort not in sortable_columns:
raise DatasetteError('Cannot sort table by {}'.format(sort))
order_by = escape_sqlite(sort)
sort_desc = special_args.get('_sort_desc')
if sort_desc:
if sort_desc not in sortable_columns:
raise DatasetteError('Cannot sort table by {}'.format(sort_desc))
if sort:
raise DatasetteError('Cannot use _sort and _sort_desc at the same time')
order_by = '{} desc'.format(escape_sqlite(sort_desc))
count_sql = 'select count(*) from {table_name} {where}'.format(
table_name=escape_sqlite(table),
where=(
'where {} '.format(' and '.join(where_clauses))
) if where_clauses else '',
)
# _group_count=col1&_group_count=col2
group_count = special_args_lists.get('_group_count') or []
if group_count:
sql = 'select {group_cols}, count(*) as "count" from {table_name} {where} group by {group_cols} order by "count" desc limit 100'.format(
group_cols=', '.join('"{}"'.format(group_count_col) for group_count_col in group_count),
extra_template_data = {}
start = time.time()
status_code = 200
templates = []
try:
response_or_template_contexts = await self.data(
request, name, hash, **kwargs
)
if isinstance(response_or_template_contexts, response.HTTPResponse):
return response_or_template_contexts
else:
data, extra_template_data, templates = response_or_template_contexts
except (sqlite3.OperationalError, InvalidSql, DatasetteError) as e:
raise DatasetteError(str(e), title='Invalid SQL', status=400)
except (sqlite3.OperationalError) as e:
raise DatasetteError(str(e))
except DatasetteError:
raise
end = time.time()
data['query_ms'] = (end - start) * 1000
for key in ('source', 'source_url', 'license', 'license_url'):
value = self.ds.metadata.get(key)
if value:
data[key] = value
if as_json:
# Special case for .jsono extension - redirect to _shape=objects
if as_json == '.jsono':
return self.redirect(
request,
path_with_added_args(
request,
{'_shape': 'objects'},
def on_exception(request, exception):
title = None
if isinstance(exception, NotFound):
status = 404
info = {}
message = exception.args[0]
elif isinstance(exception, InvalidUsage):
status = 405
info = {}
message = exception.args[0]
elif isinstance(exception, DatasetteError):
status = exception.status
info = exception.error_dict
message = exception.message
title = exception.title
else:
status = 500
info = {}
message = str(exception)
traceback.print_exc()
templates = ['500.html']
if status != 500:
templates = ['{}.html'.format(status)] + templates
info.update({
'ok': False,
'error': message,
'status': status,