Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return HTTPFound(location=location)
else:
item.name = title_to_name(name,
blacklist=self.context.keys())
item.title = title
self.flash(_(u'Your changes have been saved.'), 'success')
return self.back('@@contents')
if 'cancel' in self.request.POST:
self.flash(_(u'No changes were made.'), 'info')
return self.back('@@contents')
ids = self._selected_children(add_context=False)
items = []
if ids is not None:
items = DBSession.query(Node).filter(Node.id.in_(ids)).all()
return {'items': items}
def delete_nodes(self):
"""
Delete nodes view. Renders either a view to delete multiple nodes or
delete the selected nodes and get back to the referrer of the request.
:result: Either a redirect response or a dictionary passed to the
template for rendering.
:rtype: pyramid.httpexceptions.HTTPFound or dict
"""
if 'delete_nodes' in self.request.POST:
ids = self.request.POST.getall('children-to-delete')
if not ids:
self.flash(_(u"Nothing was deleted."), 'info')
for id in ids:
item = DBSession.query(Node).get(id)
self.flash(_(u'${title} was deleted.',
mapping=dict(title=item.title)), 'success')
del self.context[item.name]
return self.back('@@contents')
if 'cancel' in self.request.POST:
self.flash(_(u'No changes were made.'), 'info')
return self.back('@@contents')
ids = self._selected_children(add_context=False)
items = []
if ids is not None:
items = DBSession.query(Node).filter(Node.id.in_(ids)).\
order_by(Node.position).all()
return {'items': items,
'states': _states(self.context, self.request)}
def get(file_id: str) -> DBStoredFile:
"""Returns the file given by the file_id
:param file_id: the unique id associated to the file
:type file_id: string
:result: a :class:`kotti.filedepot.DBStoredFile` instance
:rtype: :class:`kotti.filedepot.DBStoredFile`
"""
f = DBSession.query(DBStoredFile).filter_by(file_id=file_id).first()
if f is None:
raise IOError
return f
def set_visibility(self, show):
""" Do the real work to set the visibility of nodes in the menu. Called
by the show and the hide view.
:result: Redirect response to the referrer of the request.
:rtype: pyramid.httpexceptions.HTTPFound
"""
ids = self._selected_children()
for id in ids:
child = DBSession.query(Node).get(id)
if child.in_navigation != show:
child.in_navigation = show
mapping = dict(title=child.title)
if show:
mg = _(
"${title} is now visible in the navigation.", mapping=mapping
)
else:
mg = _(
"${title} is no longer visible in the navigation.",
mapping=mapping,
)
self.flash(mg, "success")
if not self.request.is_xhr:
return self.back()
def nodes_tree(request, context=None, permission="view"):
item_mapping = {}
item_to_children = defaultdict(lambda: [])
for node in DBSession.query(Content).with_polymorphic(Content):
item_mapping[node.id] = node
if request.has_permission(permission, node):
item_to_children[node.parent_id].append(node)
for children in item_to_children.values():
children.sort(key=lambda ch: ch.position)
if context is None:
node = item_to_children[None][0]
else:
node = context
return NodesTree(node, request, item_mapping, item_to_children, permission)
def _tag_find_or_create(cls, title: str) -> "TagsToContents":
"""
Find or create a tag with the given title.
:param title: Title of the tag to find or create.
:type title: str
:result:
:rtype: :class:`~kotti.resources.TagsToContents`
"""
with DBSession.no_autoflush:
tag = DBSession.query(Tag).filter_by(title=title).first()
if tag is None:
tag = Tag(title=title)
return cls(tag=tag)
def _update_children_paths(old_parent_path, new_parent_path):
for child in (
DBSession.query(Node)
.options(load_only("path", "type"))
.filter(Node.path.startswith(old_parent_path))
.order_by(Node.path)
):
if child.path == new_parent_path:
# The child is the node itself and has already be renamed.
# Nothing to do!
continue
child.path = new_parent_path + child.path[len(old_parent_path):]
self.flash(
_("${title} was deleted.", mapping=dict(title=item.title)),
"success",
)
del self.context[item.name]
return self.back("@@contents")
if "cancel" in self.request.POST:
self.flash(_("No changes were made."), "info")
return self.back("@@contents")
ids = self._selected_children(add_context=False)
items = []
if ids is not None:
items = (
DBSession.query(Node)
.filter(Node.id.in_(ids))
.order_by(Node.position)
.all()
)
return {"items": items, "states": _states(self.context, self.request)}
files = sa.Table('files', metadata)
files.c.data.type = sa.LargeBinary() # this restores to old column type
dn = DepotManager.get_default()
_saved = []
def process(thing):
id, data, filename, mimetype = thing
uploaded_file = UploadedFile({'depot_name': dn, 'files': []})
uploaded_file._thaw()
uploaded_file.process_content(
data, filename=filename, content_type=mimetype)
_saved.append({'nodeid': id, 'data': uploaded_file.encode()})
log.info("Saved data for node id {}".format(id))
query = DBSession.query(
files.c.id, files.c.data, files.c.filename, files.c.mimetype
).order_by(files.c.id).yield_per(10)
window_size = 10
window_idx = 0
log.info("Starting migration of blob data")
now = time.time()
while True:
start, stop = window_size * window_idx, window_size * (window_idx + 1)
things = query.slice(start, stop).all()
if things is None:
break
for thing in things:
process(thing)
def _do_copy_or_cut(self, action, action_title):
ids = self._selected_children()
self.request.session["kotti.paste"] = (ids, action)
for id in ids:
item = DBSession.query(Node).get(id)
self.flash(
_("${title} was %s." % action_title, mapping=dict(title=item.title)),
"success",
)
if not self.request.is_xhr:
return self.back()