Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def scan_index():
publ.model.create_tables()
publ.index.scan_index(config.content_directory)
publ.index.background_scan(config.content_directory)
def scan_index():
publ.model.create_tables()
publ.index.scan_index(config.content_directory)
publ.index.background_scan(config.content_directory)
def do_not_cache():
""" Return whether we should cache a page render """
from . import index # pylint: disable=cyclic-import
if index.in_progress():
# We are reindexing the site
return True
return False
def _startup(self):
""" Startup routine for initiating the content indexer """
with self.app_context():
model.setup(self.publ_config.database_config)
import click
ctx = click.get_current_context(silent=True)
if not ctx or ctx.info_name == 'run':
index.scan_index(self.publ_config.content_folder)
index.background_scan(self.publ_config.content_folder)
def reindex_command(quietly, fresh):
""" Command for reindexing the index """
from . import index, model
if fresh:
model.reset()
spinner = itertools.cycle('|/-\\')
index.scan_index(config.content_folder, False)
while index.in_progress():
if not quietly:
qlen = index.queue_size() or ''
print("\rIndexing... %s %s " % (next(spinner), qlen), end='', flush=True)
time.sleep(0.1)
if not quietly:
print("Done")
def scan_index():
publ.model.create_tables()
publ.index.scan_index(config.content_directory)
publ.index.background_scan(config.content_directory)
def render_exception(error):
""" Catch-all renderer for the top-level exception handler """
LOGGER.debug("render_exception %s %s", type(error), error)
# Effectively strip off the leading '/', so map_template can decide
# what the actual category is
category = request.path[1:]
qsize = index.queue_size()
if isinstance(error, http_error.NotFound) and (qsize or index.in_progress()):
retry = max(5, qsize / 5)
return render_error(
category, "Site reindex in progress", 503,
exception={
'type': 'Service Unavailable',
'str': "The site's contents are not fully known; please try again later (qs="
+ str(qsize) + ")",
'qsize': qsize
},
headers={
**NO_CACHE,
'Retry-After': retry,
'Refresh': retry
})
if isinstance(error, http_error.Unauthorized):
# Cache-busting query based on most recently-visible entry
cb_query = queries.build_query({})
cb_query = cb_query.order_by(orm.desc(model.Entry.utc_date))
latest = cb_query.first()
if latest:
LOGGER.debug("Most recently-scheduled entry: %s", latest)
return latest.id
return None
try:
flask.g.stash = {}
text, etag, flask.g.stash = do_render(
template,
user=user.get_active(),
_url=request.url,
_index_time=index.last_indexed(),
_latest=latest_entry(),
_publ_version=__version__.__version__,
**kwargs)
return text, etag
except queries.InvalidQueryError as err:
raise http_error.BadRequest(str(err))
'/_logout/',
'/_logout/'
]:
self.add_url_rule(route, 'logout', logout, methods=['GET', 'POST'])
for route in [
'/_admin',
'/_admin/'
]:
self.add_url_rule(route, 'admin', rendering.admin_dashboard)
self.before_request(user.log_user)
self.after_request(tokens.inject_auth_headers)
self._maint = maintenance.Maintenance(self)
self.indexer = index.Indexer(self, self.publ_config.index_wait_time)
if self.publ_config.index_rescan_interval:
self._maint.register(functools.partial(index.scan_index,
self.publ_config.content_folder),
self.publ_config.index_rescan_interval)
if self.publ_config.image_cache_interval and self.publ_config.image_cache_age:
self._maint.register(functools.partial(image.clean_cache,
self.publ_config.image_cache_age),
self.publ_config.image_cache_interval)
if self.publ_config.auth_log_prune_interval and self.publ_config.auth_log_prune_age:
self._maint.register(functools.partial(user.prune_log,
self.publ_config.auth_log_prune_age),
self.publ_config.auth_log_prune_interval)