Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _populator():
from kotti.populate import populate
populate()
for doc in DBSession.query(Document)[1:]:
DBSession.delete(doc)
transaction.commit()
def test_inherit(self):
session = DBSession()
root = get_root()
child = root[u'child'] = Node()
session.flush()
self.assertEqual(list_groups('bob', child), [])
set_groups('bob', root, ['role:editor'])
self.assertEqual(list_groups('bob', child), ['role:editor'])
# Groups from the child are added:
set_groups('bob', child, ['group:somegroup'])
self.assertEqual(
set(list_groups('bob', child)),
set(['group:somegroup', 'role:editor'])
)
# We can ask to list only those groups that are defined locally:
def test_rename_to_empty_name(self):
from kotti.views.edit import move_node
root = DBSession().query(Node).get(1)
child = root['child'] = Document(title=u"Child")
request = DummyRequest()
request.params['rename'] = u'on'
request.params['name'] = u''
request.params['title'] = u'foo'
move_node(child, request)
self.assertEqual(request.session.pop_flash('error'),
[u'Name and title are required.'])
to_state, )
self.flash(_(u'Your changes have been saved.'), 'success')
else:
self.flash(_(u'No changes were made.'), 'info')
return self.back('@@contents')
if 'cancel' in self.request.POST:
self.flash(_(u'No changes were made.'), 'info')
return self.back('@@contents')
ids = self._selected_children(add_context=False)
items = transitions = []
if ids is not None:
wf = get_workflow(self.context)
if wf is not None:
items = DBSession.query(Node).filter(Node.id.in_(ids)).all()
for item in items:
trans_info = wf.get_transitions(item, self.request)
for tran_info in trans_info:
if tran_info not in transitions:
transitions.append(tran_info)
return {'items': items,
'states': _states(self.context, self.request),
'transitions': transitions, }
def read(self, n: int = -1) -> bytes:
"""Reads ``n`` bytes from the file.
If ``n`` is not specified or is ``-1`` the whole
file content is read in memory and returned
"""
if self._data is _marker:
file_id = DBSession.merge(self).file_id
self._data = (
DBSession.query(DBStoredFile.data).filter_by(file_id=file_id).scalar()
)
if n == -1:
result = self._data[self._cursor:]
else:
result = self._data[self._cursor:self._cursor + n]
self._cursor += len(result)
return result
def delete_orphaned_tags(event):
"""Delete Tag instances / records when they are not associated with any
content.
:param event: event that triggered this handler.
:type event: :class:`ObjectAfterDelete`
"""
# noinspection PyUnresolvedReferences
DBSession.query(Tag).filter(~Tag.content_tags.any()).delete(
synchronize_session=False
)
:param vpath_tuple: Tuple of path segments to be traversed
:type vpath_tuple: tuple
:return: List of nodes, from root (excluded) to context (included).
Each node has its parent set already, so that no subsequent
queries will be be performed, e.g. when calling
``lineage(context)``
:rtype: list of :class:`kotti.resources.Node`
"""
conditions = [
(Node.path == root.path + "/".join(vpath_tuple[: idx + 1]) + "/")
for idx, item in enumerate(vpath_tuple)
]
nodes = (
DBSession().query(Node).order_by(Node.path).filter(or_(*conditions)).all()
)
for i, node in enumerate(nodes):
if i == 0:
setattr(node, "parent", root)
else:
setattr(node, "parent", nodes[i - 1])
return nodes