Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_metafile(filepath):
""" Load a metadata file from the filesystem """
try:
with open(filepath, 'r', encoding='utf-8') as file:
return email.message_from_file(file)
except FileNotFoundError:
LOGGER.warning("Category file %s not found", filepath)
# SQLite doesn't support cascading deletes, so clean up
orm.delete(pa for pa in model.PathAlias if pa.category.file_path == filepath)
orm.delete(c for c in model.Category if c.file_path == filepath)
orm.commit()
return None
def scan_index():
publ.model.create_tables()
publ.index.scan_index(config.content_directory)
publ.index.background_scan(config.content_directory)
class FilterCombiner(Enum):
""" Which operation to use when combining multiple operations in a filter """
ANY = 0 # any of the criteria are present
OR = 0 # synonym for ANY
ALL = 1 # all of the criteria are present
AND = 1 # synonym for ALL
NONE = 2 # none of the criteria are present
NOT = 2 # synonym for NONE
# Ordering queries for different sort orders
ORDER_BY = {
'newest': (orm.desc(model.Entry.local_date), orm.desc(model.Entry.id)),
'oldest': (model.Entry.local_date, model.Entry.id),
'title': (model.Entry.sort_title, model.Entry.id)
}
REVERSE_ORDER_BY = {
'newest': (model.Entry.local_date, model.Entry.id),
'oldest': (orm.desc(model.Entry.local_date), orm.desc(model.Entry.id)),
'title': (orm.desc(model.Entry.sort_title), orm.desc(model.Entry.id))
}
def where_entry_visible(query, date=None):
""" Generate a where clause for currently-visible entries
Arguments:
def _first(**spec) -> typing.Optional[entry.Entry]:
""" Get the earliest entry in this category, optionally including subcategories """
for record in self._entries(spec).order_by(model.Entry.local_date,
model.Entry.id)[:1]:
return entry.Entry.load(record)
return None
if (e.category == category
or e.category.startswith(category + '/'))
and e.visible):
raise http_error.NotFound("No such category")
if not template:
template = Category.load(category).get('Index-Template') or 'index'
tmpl = map_template(category, template)
if not tmpl:
# this might actually be a malformed category URL
test_path = '/'.join((category, template)) if category else template
LOGGER.debug("Checking for malformed category %s", test_path)
record = orm.select(
e for e in model.Entry if e.category == test_path).exists() # type:ignore
if record:
return redirect(url_for('category', category=test_path, **request.args))
# nope, we just don't know what this is
raise http_error.NotFound(f"No such view '{template}'")
view_spec = view.parse_view_spec(request.args)
view_spec['category'] = category
view_obj = view.View.load(view_spec)
rendered, etag = render_publ_template(
tmpl,
category=Category.load(category),
view=view_obj)
if request.if_none_match.contains(etag):
def reindex_command(quietly, fresh):
""" Command for reindexing the index """
from . import index, model
if fresh:
model.reset()
spinner = itertools.cycle('|/-\\')
index.scan_index(config.content_folder, False)
while index.in_progress():
if not quietly:
qlen = index.queue_size() or ''
print("\rIndexing... %s %s " % (next(spinner), qlen), end='', flush=True)
time.sleep(0.1)
if not quietly:
print("Done")
def __init__(self, create_key, record):
""" Instantiate the Entry wrapper """
assert create_key == Entry.load.__name__, "Entry must be created with Entry.load()"
LOGGER.debug('init entry %d', record.id)
self._record = record # index record
self._fingerprint = model.FileFingerprint.get(file_path=record.file_path)
LOGGER.debug('loaded entry %d, fingerprint=%s', record.id,
self._fingerprint.fingerprint if self._fingerprint else None)
# maps (section,footnotes_enabled) -> toc/footnote counter
self._counters: typing.Dict[typing.Tuple[str, bool], markdown.ItemCounter] = {}
def scan_index():
publ.model.create_tables()
publ.index.scan_index(config.content_directory)
publ.index.background_scan(config.content_directory)
def _startup(self):
""" Startup routine for initiating the content indexer """
with self.app_context():
model.setup(self.publ_config.database_config)
import click
ctx = click.get_current_context(silent=True)
if not ctx or ctx.info_name == 'run':
index.scan_index(self.publ_config.content_folder)
index.background_scan(self.publ_config.content_folder)