Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _asset_urls(self, key, template, context):
# Flatten list-of-lists from plugins:
seen_urls = set()
for url_or_dict in itertools.chain(
itertools.chain.from_iterable(
getattr(pm.hook, key)(
template=template.name,
database=context.get("database"),
table=context.get("table"),
datasette=self.ds,
)
),
(self.ds.metadata(key) or []),
):
if isinstance(url_or_dict, dict):
url = url_or_dict["url"]
sri = url_or_dict.get("sri")
else:
url = url_or_dict
sri = None
if url in seen_urls:
continue
for template_name in templates
]
body_scripts = []
# pylint: disable=no-member
for script in pm.hook.extra_body_script(
template=template.name,
database=context.get("database"),
table=context.get("table"),
view_name=self.name,
datasette=self.ds,
):
body_scripts.append(jinja2.Markup(script))
extra_template_vars = {}
# pylint: disable=no-member
for extra_vars in pm.hook.extra_template_vars(
template=template.name,
database=context.get("database"),
table=context.get("table"),
view_name=self.name,
request=request,
datasette=self.ds,
):
if callable(extra_vars):
extra_vars = extra_vars()
if asyncio.iscoroutine(extra_vars):
extra_vars = await extra_vars
assert isinstance(extra_vars, dict), "extra_vars is of type {}".format(
type(extra_vars)
)
extra_template_vars.update(extra_vars)
flat_pks_quoted=path_from_row_pks(row, pks, not pks),
)
),
}
)
for value, column_dict in zip(row, columns):
column = column_dict["name"]
if link_column and len(pks) == 1 and column == pks[0]:
# If there's a simple primary key, don't repeat the value as it's
# already shown in the link column.
continue
# First let the plugins have a go
# pylint: disable=no-member
plugin_display_value = pm.hook.render_cell(
value=value,
column=column,
table=table,
database=database,
datasette=self.ds,
)
if plugin_display_value is not None:
display_value = plugin_display_value
elif isinstance(value, bytes):
display_value = jinja2.Markup(
"<Binary data: {} byte{}>".format(
len(value), "" if len(value) == 1 else "s"
)
)
elif isinstance(value, dict):
# It's an expanded foreign key - display link to other row
r"/(?P[^/]+)/(?P[^/]+?)/(?P[^/]+?)(?P"
+ renderer_regex
+ r")?$",
)
self.register_custom_units()
async def setup_db():
# First time server starts up, calculate table counts for immutable databases
for dbname, database in self.databases.items():
if not database.is_mutable:
await database.table_counts(limit=60 * 60 * 1000)
asgi = AsgiLifespan(
AsgiTracer(DatasetteRouter(self, routes)), on_startup=setup_db
)
for wrapper in pm.hook.asgi_wrapper(datasette=self):
asgi = wrapper(asgi)
return asgi
<table></table>
def prepare_connection(self, conn):
conn.row_factory = sqlite3.Row
conn.text_factory = lambda x: str(x, "utf-8", "replace")
for name, num_args, func in self.sqlite_functions:
conn.create_function(name, num_args, func)
if self.sqlite_extensions:
conn.enable_load_extension(True)
for extension in self.sqlite_extensions:
conn.execute("SELECT load_extension('{}')".format(extension))
if self.config("cache_size_kb"):
conn.execute("PRAGMA cache_size=-{}".format(self.config("cache_size_kb")))
# pylint: disable=no-member
pm.hook.prepare_connection(conn=conn)
def register_renderers(self):
""" Register output renderers which output data in custom formats. """
# Built-in renderers
self.renderers["json"] = json_renderer
# Hooks
hook_renderers = []
# pylint: disable=no-member
for hook in pm.hook.register_output_renderer(datasette=self):
if type(hook) == list:
hook_renderers += hook
else:
hook_renderers.append(hook)
for renderer in hook_renderers:
self.renderers[renderer["extension"]] = renderer["callback"]
async def extra_template():
display_rows = []
for row in results.rows:
display_row = []
for column, value in zip(results.columns, row):
display_value = value
# Let the plugins have a go
# pylint: disable=no-member
plugin_value = pm.hook.render_cell(
value=value,
column=column,
table=None,
database=database,
datasette=self.ds,
)
if plugin_value is not None:
display_value = plugin_value
else:
if value in ("", None):
display_value = jinja2.Markup(" ")
elif is_url(str(display_value).strip()):
display_value = jinja2.Markup(
'<a href="{url}">{url}</a>'.format(
url=jinja2.escape(value.strip())
)
count_rows = list(
await self.ds.execute(database, count_sql, from_sql_params)
)
filtered_table_rows_count = count_rows[0][0]
except QueryInterrupted:
pass
# facets support
if not self.ds.config("allow_facet") and any(
arg.startswith("_facet") for arg in request.args
):
raise DatasetteError("_facet= is not allowed", status=400)
# pylint: disable=no-member
facet_classes = list(
itertools.chain.from_iterable(pm.hook.register_facet_classes())
)
facet_results = {}
facets_timed_out = []
facet_instances = []
for klass in facet_classes:
facet_instances.append(
klass(
self.ds,
request,
database,
sql=sql_no_limit,
params=params,
table=table,
metadata=table_metadata,
row_count=filtered_table_rows_count,
)