Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
self.name = name
# Flask expects template filenames to be /-separated regardless of
# platform
if os.sep != '/':
self.filename = '/'.join(os.path.normpath(filename).split(os.sep))
else:
self.filename = filename
self.file_path = file_path
if file_path:
self.mtime = os.stat(file_path).st_mtime
self.last_modified = arrow.get(self.mtime)
self._fingerprint = utils.file_fingerprint(file_path)
self.content = content
if content:
self._fingerprint = hashlib.md5(content.encode('utf-8')).hexdigest()
def set_fingerprint(fullpath, fingerprint=None):
""" Set the last known modification time for a file """
try:
fingerprint = fingerprint or utils.file_fingerprint(fullpath)
record = model.FileFingerprint.get(file_path=fullpath)
if record and record.fingerprint != fingerprint:
record.set(fingerprint=fingerprint,
file_mtime=os.stat(fullpath).st_mtime)
elif not record:
record = model.FileFingerprint(
file_path=fullpath,
fingerprint=fingerprint,
file_mtime=os.stat(fullpath).st_mtime)
orm.commit()
except FileNotFoundError:
orm.delete(fp for fp in model.FileFingerprint if fp.file_path == fullpath)
def _get_asset(file_path):
""" Get the database record for an asset file """
record = model.Image.get(file_path=file_path)
fingerprint = ','.join((utils.file_fingerprint(file_path),
str(RENDITION_VERSION)))
if not record or record.fingerprint != fingerprint:
# Reindex the file
LOGGER.info("Updating image %s -> %s", file_path, fingerprint)
# compute the md5sum; from https://stackoverflow.com/a/3431838/318857
md5 = hashlib.md5()
md5.update(bytes(RENDITION_VERSION))
with open(file_path, 'rb') as file:
for chunk in iter(lambda: file.read(16384), b""):
md5.update(chunk)
values = {
'file_path': file_path,
'checksum': md5.hexdigest(),
'fingerprint': fingerprint,
def scan_file(fullpath: str, relpath: typing.Optional[str], fixup_pass: int) -> bool:
""" scan a file and put it into the index
:param fullpath str: The full file path
:param relpath typing.Optional[str]: The file path relative to the content
root; if None, this will be inferred
:param fixup_pass int: Which iteration of fixing-up we're on
"""
# pylint: disable=too-many-branches,too-many-statements,too-many-locals
try:
check_fingerprint = utils.file_fingerprint(fullpath)
entry = load_message(fullpath)
except FileNotFoundError:
# The file doesn't exist, so remove it from the index
record = model.Entry.get(file_path=fullpath)
if record:
expire_record(record)
return True
entry_id = get_entry_id(entry, fullpath, fixup_pass > 0)
if entry_id is None:
return False
fixup_needed = False
if not relpath:
relpath = os.path.relpath(fullpath, config.content_folder)
def save_file(fullpath: str, entry: email.message.Message, fingerprint: str):
""" Save a message file out, without mangling the headers """
from atomicwrites import atomic_write
with atomic_write(fullpath, overwrite=True) as file:
# we can't just use file.write(str(entry)) because otherwise the
# headers "helpfully" do MIME encoding normalization.
# str(val) is necessary to get around email.header's encoding
# shenanigans
for key, val in entry.items():
print(f'{key}: {str(val)}', file=file)
print('', file=file)
file.write(entry.get_payload())
if utils.file_fingerprint(fullpath) != fingerprint:
LOGGER.warning("File %s changed during atomic write; aborting", fullpath)
raise RuntimeError("File changed during reindex")
return True
def scan_directory(root, files):
""" Helper function to scan a single directory """
LOGGER.debug("scanning directory %s", root)
try:
for file in files:
fullpath = os.path.join(root, file)
relpath = os.path.relpath(fullpath, content_dir)
if not is_scannable(fullpath):
continue
fingerprint = utils.file_fingerprint(fullpath)
last_fingerprint = get_last_fingerprint(fullpath)
if fingerprint != last_fingerprint:
LOGGER.debug("%s: %s -> %s", fullpath, last_fingerprint, fingerprint)
indexer.scan_file(fullpath, relpath, 0, wait_start)
except Exception: # pylint:disable=broad-except
LOGGER.exception("Got error parsing directory %s", root)
def _get_asset(file_path):
""" Get the database record for an asset file """
record = model.Image.get(file_path=file_path)
fingerprint = ','.join((utils.file_fingerprint(file_path),
str(RENDITION_VERSION)))
if not record or record.fingerprint != fingerprint:
# Reindex the file
logger.info("Updating image %s -> %s", file_path, fingerprint)
# compute the md5sum; from https://stackoverflow.com/a/3431838/318857
md5 = hashlib.md5()
md5.update(bytes(RENDITION_VERSION))
with open(file_path, 'rb') as file:
for chunk in iter(lambda: file.read(16384), b""):
md5.update(chunk)
values = {
'file_path': file_path,
'checksum': md5.hexdigest(),
'fingerprint': fingerprint,