Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not is_scannable(fullpath):
continue
fingerprint = utils.file_fingerprint(fullpath)
last_fingerprint = get_last_fingerprint(fullpath)
if fingerprint != last_fingerprint:
LOGGER.debug("%s: %s -> %s", fullpath, last_fingerprint, fingerprint)
indexer.scan_file(fullpath, relpath, 0, wait_start)
except Exception: # pylint:disable=broad-except
LOGGER.exception("Got error parsing directory %s", root)
for root, _, files in os.walk(content_dir, followlinks=True):
indexer.submit(scan_directory, root, files)
for table in (model.Entry, model.Category, model.Image, model.FileFingerprint):
indexer.submit(prune_missing, table)
def _next(**kwargs) -> typing.Optional["Entry"]:
""" Get the next item in any particular category """
spec = self._pagination_default_spec(kwargs)
spec.update(kwargs)
query = queries.build_query(spec)
query = queries.where_after_entry(query, self._record)
cur_user = user.get_active()
for record in query.order_by(model.Entry.local_date,
model.Entry.id):
if record.is_authorized(cur_user):
return Entry.load(record)
LOGGER.debug("User unauthorized for entry %d", record.id)
tokens.request(cur_user)
return None
return CallableProxy(_next)
def latest_entry():
# Cache-busting query based on most recently-visible entry
cb_query = queries.build_query({})
cb_query = cb_query.order_by(orm.desc(model.Entry.utc_date))
latest = cb_query.first()
if latest:
LOGGER.debug("Most recently-scheduled entry: %s", latest)
return latest.id
return None
class Category(DbEntity):
""" Metadata for a category """
category = orm.Optional(str)
file_path = orm.Required(str)
sort_name = orm.Optional(str)
aliases = orm.Set("PathAlias")
class PathAlias(DbEntity):
""" Path alias mapping """
path = orm.PrimaryKey(str)
entry = orm.Optional(Entry)
category = orm.Optional(Category)
template = orm.Optional(str)
alias_type = orm.Required(int) # reflects AliasType
class Image(DbEntity):
""" Image metadata """
file_path = orm.PrimaryKey(str)
checksum = orm.Required(str)
fingerprint = orm.Required(str)
width = orm.Optional(int)
height = orm.Optional(int)
transparent = orm.Optional(bool)
is_asset = orm.Required(bool, default=False)
return None
if not entry_id:
# See if we already have an entry with this file path
by_filepath = model.Entry.select(lambda e: e.file_path == fullpath).first()
if by_filepath:
entry_id = by_filepath.id
if not entry_id:
# We still don't have an ID; generate one pseudo-randomly, based on the
# entry file path. This approach averages around 0.25 collisions per ID
# generated while keeping the entry ID reasonably short. In general,
# count*N averages 1/(N-1) collisions per ID.
limit = max(10, orm.get(orm.count(e)
for e in model.Entry) * 5) # type:ignore
attempt = 0
while not entry_id or model.Entry.get(id=entry_id):
# Stably generate a quasi-random entry ID from the file path
md5 = hashlib.md5()
md5.update(f"{fullpath} {attempt}".encode('utf-8'))
entry_id = int.from_bytes(md5.digest(), byteorder='big') % limit
attempt = attempt + 1
if other_entry:
LOGGER.warning("Entry '%s' had ID %d, which belongs to '%s'. Reassigned to %d",
fullpath, other_entry.id, other_entry.file_path, entry_id)
return entry_id
class EntryAuth(DbEntity):
""" An authentication record for an entry """
order = orm.Required(int)
entry = orm.Required(Entry)
user_group = orm.Required(str)
allowed = orm.Required(bool)
orm.composite_key(entry, order)
class AuthLog(DbEntity):
""" Authentication log for private entries """
date = orm.Required(datetime.datetime, index=True)
entry = orm.Required(Entry, index=True)
user = orm.Required(str, index=True)
user_groups = orm.Optional(str)
authorized = orm.Required(bool, index=True)
orm.PrimaryKey(entry, user)
class KnownUser(DbEntity):
""" Users who are known to the system """
user = orm.PrimaryKey(str)
last_seen = orm.Required(datetime.datetime, index=True)
def reset():
""" Completely reset the database """
LOGGER.info("Rebuilding schema")
def get_entry(entry):
""" Helper function to get an entry by ID or by object """
if isinstance(entry, model.Entry):
return entry
if hasattr(entry, '_record'):
return getattr(entry, '_record')
if isinstance(entry, (int, str)):
return model.Entry.get(id=int(entry))
raise ValueError(f"entry is of unknown type {type(entry)}")
else:
if user and auth.user_group in user.auth_groups:
result = auth.allowed
LOGGER.debug(" result->%s", result)
LOGGER.debug("Final result: %s", result)
return result
class EntryTag(DbEntity):
""" Tags for an entry """
key = orm.PrimaryKey(str)
name = orm.Required(str)
entries = orm.Set(Entry)
class Category(DbEntity):
""" Metadata for a category """
category = orm.Optional(str)
file_path = orm.Required(str)
sort_name = orm.Optional(str)
aliases = orm.Set("PathAlias")
class PathAlias(DbEntity):
""" Path alias mapping """
path = orm.PrimaryKey(str)
entry = orm.Optional(Entry)