Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
remove_attach = []
for attach in record.attachments:
if attach not in set_attach:
remove_attach.append(attach)
LOGGER.debug("set_attach %s remove_attach %s", set_attach, remove_attach)
for attach in remove_attach:
record.attachments.remove(attach)
for attach in set_attach:
record.attachments.add(attach)
orm.commit()
# do final fixups
if record.status == model.PublishStatus.DRAFT.value:
LOGGER.info("Not touching draft entry %s", fullpath)
elif fixup_needed:
LOGGER.info("Fixing up entry %s", fullpath)
result = save_file(fullpath, entry, check_fingerprint)
return result
result = handle_path_alias()
if result:
return result
LOGGER.info("Attempted to retrieve nonexistent entry %d", entry_id)
raise http_error.NotFound("No such entry")
return render_entry_record(record, category, None)
STATUS_EXCEPTIONS = {
# Draft entries are a 403 with a custom error
model.PublishStatus.DRAFT.value: http_error.Forbidden("Entry not available"),
model.PublishStatus.GONE.value: http_error.Gone(),
model.PublishStatus.ILLEGAL.value: http_error.UnavailableForLegalReasons(),
model.PublishStatus.TEAPOT.value: http_error.ImATeapot(),
}
def render_entry_record(record: model.Entry, category: str, template: typing.Optional[str],
_mounted=False):
""" Render an entry object """
if record.status in STATUS_EXCEPTIONS:
raise STATUS_EXCEPTIONS[record.status]
# If the entry is private and the user isn't logged in, redirect
result = _check_authorization(record, category)
if result:
return result
# It's not a valid entry, so see if it's a redirection
result = handle_path_alias()
if result:
return result
LOGGER.info("Attempted to retrieve nonexistent entry %d", entry_id)
raise http_error.NotFound("No such entry")
return render_entry_record(record, category, None)
STATUS_EXCEPTIONS = {
# Draft entries are a 403 with a custom error
model.PublishStatus.DRAFT.value: http_error.Forbidden("Entry not available"),
model.PublishStatus.GONE.value: http_error.Gone(),
model.PublishStatus.ILLEGAL.value: http_error.UnavailableForLegalReasons(),
model.PublishStatus.TEAPOT.value: http_error.ImATeapot(),
}
def render_entry_record(record: model.Entry, category: str, template: typing.Optional[str],
_mounted=False):
""" Render an entry object """
if record.status in STATUS_EXCEPTIONS:
raise STATUS_EXCEPTIONS[record.status]
# If the entry is private and the user isn't logged in, redirect
result = _check_authorization(record, category)
if result:
return result
def where_entry_visible_future(query):
""" Generate a where clause for entries that are visible now or in the future """
return orm.select(
e for e in query
if e.status in (model.PublishStatus.PUBLISHED.value,
model.PublishStatus.SCHEDULED.value))
def visible(self) -> bool:
""" Returns true if the entry should be viewable """
return self.status not in (PublishStatus.DRAFT.value,
PublishStatus.GONE.value)
def where_entry_deleted(query):
""" Generate a where clause for entries that have been deleted """
return orm.select(
e for e in query
if e.status == model.PublishStatus.GONE.value)
def visible(self) -> bool:
""" Returns true if the entry should be viewable """
return self.status not in (PublishStatus.DRAFT.value,
PublishStatus.GONE.value)
if not record:
# It's not a valid entry, so see if it's a redirection
result = handle_path_alias()
if result:
return result
LOGGER.info("Attempted to retrieve nonexistent entry %d", entry_id)
raise http_error.NotFound("No such entry")
return render_entry_record(record, category, None)
STATUS_EXCEPTIONS = {
# Draft entries are a 403 with a custom error
model.PublishStatus.DRAFT.value: http_error.Forbidden("Entry not available"),
model.PublishStatus.GONE.value: http_error.Gone(),
model.PublishStatus.ILLEGAL.value: http_error.UnavailableForLegalReasons(),
model.PublishStatus.TEAPOT.value: http_error.ImATeapot(),
}
def render_entry_record(record: model.Entry, category: str, template: typing.Optional[str],
_mounted=False):
""" Render an entry object """
if record.status in STATUS_EXCEPTIONS:
raise STATUS_EXCEPTIONS[record.status]
# If the entry is private and the user isn't logged in, redirect
result = _check_authorization(record, category)
""" Get or generate an entry ID for an entry """
other_entry: typing.Optional[model.Entry] = None
try:
entry_id = int(entry['Entry-ID']) if 'Entry-ID' in entry else None
except (ValueError, KeyError, TypeError) as err:
LOGGER.debug("Invalid entry-id: %s", err)
# See if we've inadvertently duplicated an entry ID
if entry_id is not None:
try:
other_entry = model.Entry.get(id=entry_id)
if (other_entry
and os.path.isfile(other_entry.file_path)
and not os.path.samefile(other_entry.file_path, fullpath)
and other_entry.status != model.PublishStatus.DRAFT.value):
entry_id = None
else:
other_entry = None
except FileNotFoundError:
# the other file doesn't exist, so just let it go
pass
# Do we need to assign a new ID?
if not entry_id and not assign_id:
# We're not assigning IDs yet
return None
if not entry_id:
# See if we already have an entry with this file path
by_filepath = model.Entry.select(lambda e: e.file_path == fullpath).first()
if by_filepath:
if not tmpl:
raise http_error.BadRequest("Missing entry template" + entry_template)
rendered, etag = render_publ_template(
tmpl,
entry=entry_obj,
category=Category.load(category))
if request.if_none_match.contains(etag):
return 'Not modified', 304
headers = {
'Content-Type': entry_obj.get('Content-Type', mime_type(tmpl)),
'ETag': etag
}
if record.status == model.PublishStatus.HIDDEN.value:
headers = {**headers, **NO_CACHE}
return rendered, headers