Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_principals_with_local_roles(self):
session = DBSession()
root = get_root()
child = root[u'child'] = Node()
session.flush()
self.assertEqual(principals_with_local_roles(root), [])
self.assertEqual(principals_with_local_roles(child), [])
self.assertEqual(map_principals_with_local_roles(root), [])
self.assertEqual(map_principals_with_local_roles(child), [])
set_groups('group:bobsgroup', child, ['role:editor'])
set_groups('bob', root, ['group:bobsgroup'])
set_groups('group:franksgroup', root, ['role:editor'])
self.assertEqual(
set(principals_with_local_roles(child)),
set(['bob', 'group:bobsgroup', 'group:franksgroup'])
)
self.assertEqual(
session.add(child1)
self.assertEquals(root.keys(), [u'child1'])
self.assertEquals(root[u'child1'], child1)
del root[u'child1']
self.assertEquals(root.keys(), [])
# When we delete a parent node, all its child nodes will be
# released as well:
root[u'child2'] = Node()
root[u'child2'][u'subchild'] = Node()
self.assertEquals(
session.query(Node).filter(Node.name == u'subchild').count(), 1)
del root[u'child2']
self.assertEquals(
session.query(Node).filter(Node.name == u'subchild').count(), 0)
# We can pass a tuple as the key to more efficiently reach
# down to child objects:
root[u'child3'] = Node()
subchild33 = Node(name=u'subchild33', parent=root[u'child3'])
session.add(subchild33)
self.assertTrue(
root[u'child3', u'subchild33'] is root[u'child3'][u'subchild33'])
self.assertTrue(
root[(u'child3', u'subchild33')] is subchild33)
self.assertRaises(KeyError, root.__getitem__, (u'child3', u'bad-name'))
del root[u'child3']
# Overwriting an existing Node is an error; first delete manually!
child4 = Node(name=u'child4', parent=root)
session.add(child4)
def upgrade():
from alembic.context import get_bind
conn = get_bind()
if conn.engine.dialect.name == 'mysql':
op.add_column('nodes', sa.Column('path', sa.Unicode(1000)))
else:
op.add_column('nodes', sa.Column('path', sa.Unicode(1000), index=True))
from kotti import DBSession
from kotti.resources import Node
for node in DBSession.query(Node).with_polymorphic([Node]):
reversed_lineage = reversed(tuple(lineage(node)))
node.path = u'/'.join(
node.__name__ for node in reversed_lineage) or u'/'
def downgrade():
from kotti import DBSession
from kotti.resources import Node
for node in DBSession.query(Node).with_polymorphic([Node]):
# remove trailing '/' from all nodes but root
if node.path != u'/':
node.path = node.path[:-1]
def clear(self) -> None:
DBSession.query(Node).filter(Node.parent == self).delete()
def wire_sqlalchemy(): # pragma: no cover
""" Connect SQLAlchemy events to their respective handler function (that
fires the corresponding Kotti event). """
global _WIRED_SQLALCHMEY
if _WIRED_SQLALCHMEY:
return
else:
_WIRED_SQLALCHMEY = True
sqlalchemy.event.listen(mapper, "after_delete", _after_delete)
sqlalchemy.event.listen(DBSession, "before_flush", _before_flush)
# Update the 'path' attribute on changes to 'name' or 'parent'
sqlalchemy.event.listen(Node.name, "set", _set_path_for_new_name, propagate=True)
sqlalchemy.event.listen(
Node.parent, "set", _set_path_for_new_parent, propagate=True
)
lambda session: session.query(Node)
.with_polymorphic(Node)
.add_columns(Node.id)
.enable_eagerloads(False)
.filter(Node.parent_id == None)
)
"""
:param root: The node where traversal should start
:type root: :class:`kotti.resources.Node`
:param vpath_tuple: Tuple of path segments to be traversed
:type vpath_tuple: tuple
:return: List of nodes, from root (excluded) to context (included).
Each node has its parent set already, so that no subsequent
queries will be be performed, e.g. when calling
``lineage(context)``
:rtype: list of :class:`kotti.resources.Node`
"""
conditions = [
(Node.path == root.path + "/".join(vpath_tuple[: idx + 1]) + "/")
for idx, item in enumerate(vpath_tuple)
]
nodes = (
DBSession().query(Node).order_by(Node.path).filter(or_(*conditions)).all()
)
for i, node in enumerate(nodes):
if i == 0:
setattr(node, "parent", root)
else:
setattr(node, "parent", nodes[i - 1])
return nodes
def list_children(self, context=None, permission="view"):
if context is None:
context = self.context
if isinstance(context, Node):
if permission is None:
return context.children
return context.children_with_permission(self.request, permission)
return [
c
for c in getattr(context, "values", lambda: [])()
if (not permission or self.request.has_permission(permission, c))
]
)
return HTTPFound(location=location)
else:
item.name = title_to_name(name, blacklist=self.context.keys())
item.title = title
self.flash(_("Your changes have been saved."), "success")
return self.back("@@contents")
if "cancel" in self.request.POST:
self.flash(_("No changes were made."), "info")
return self.back("@@contents")
ids = self._selected_children(add_context=False)
items = []
if ids is not None:
items = DBSession.query(Node).filter(Node.id.in_(ids)).all()
return {"items": items}