Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.assertRaises(ProgrammingError, mutableproperty, 123)
self.assertRaises(ProgrammingError, mutableproperty, None)
# Call the decorator with a function.
getter = lambda: None
p = mutableproperty(getter)
# Check we got a property object.
self.assertIsInstance(p, property)
# Check the property object has both setter and getter functions.
self.assertTrue(p.fset)
self.assertTrue(p.fget)
# Pretend we decorated an object.
o = EventSourcedEntity(entity_id='1', entity_version=1, domain_event_id=1)
o.__dict__['_'] = 'value1'
# Call the property's getter function.
value = p.fget(o)
self.assertEqual(value, 'value1')
# Call the property's setter function.
p.fset(o, 'value2')
# Check the attribute has changed.
value = p.fget(o)
self.assertEqual(value, 'value2')
# Check the property's getter function isn't the getter function we passed in.
self.assertNotEqual(p.fget, getter)
def suffix_node_id(self):
"""The id of a node with a matching suffix, representing a suffix link.
None indicates this node has no suffix link.
"""
return self._suffix_node_id
def __repr__(self):
return "Node(suffix link: %d)" % self.suffix_node_id
class Edge(EventSourcedEntity):
"""An edge in the suffix tree.
"""
class Created(EventSourcedEntity.Created): pass
class AttributeChanged(EventSourcedEntity.AttributeChanged): pass
class Discarded(EventSourcedEntity.Discarded): pass
def __init__(self, label, first_char_index, last_char_index, source_node_id, dest_node_id, **kwargs):
super(Edge, self).__init__(**kwargs)
self._label = label
self._first_char_index = first_char_index
self._last_char_index = last_char_index
self._source_node_id = source_node_id
self._dest_node_id = dest_node_id
@mutableproperty
def label(self):
"""String part represented by this edge.
if not suffix.explicit():
if len(string) < suffix.first_char_index + 1:
raise AssertionError
edge_id = make_edge_id(suffix.source_node_id, string[suffix.first_char_index])
e = self.get_edge(edge_id)
if e.length <= suffix.length:
suffix.first_char_index += e.length + 1
suffix.source_node_id = e.dest_node_id
self._canonize_suffix(suffix, string)
class SuffixTreeNode(EventSourcedEntity):
"""A node in the suffix tree.
"""
class Created(EventSourcedEntity.Created): pass
class AttributeChanged(EventSourcedEntity.AttributeChanged): pass
class Discarded(EventSourcedEntity.Discarded): pass
class ChildNodeAdded(DomainEvent): pass
class ChildNodeRemoved(DomainEvent): pass
def __init__(self, suffix_node_id=None, string_id=None, *args, **kwargs):
super(SuffixTreeNode, self).__init__(*args, **kwargs)
self._suffix_node_id = suffix_node_id
self._string_id = string_id
self._child_node_ids = OrderedDict()
@mutableproperty
return new_node.id
def _canonize_suffix(self, suffix):
"""This canonizes the suffix, walking along its suffix string until it
is explicit or there are no more matched nodes.
"""
if not suffix.explicit():
edge_id = make_edge_id(suffix.source_node_id, self.string[suffix.first_char_index])
e = self.edges[edge_id]
if e.length <= suffix.length:
suffix.first_char_index += e.length + 1
suffix.source_node_id = e.dest_node_id
self._canonize_suffix(suffix)
class Node(EventSourcedEntity):
"""A node in the suffix tree.
"""
class Created(EventSourcedEntity.Created): pass
class AttributeChanged(EventSourcedEntity.AttributeChanged): pass
class Discarded(EventSourcedEntity.Discarded): pass
def __init__(self, suffix_node_id=None, *args, **kwargs):
super(Node, self).__init__(*args, **kwargs)
self._suffix_node_id = suffix_node_id
@mutableproperty
def suffix_node_id(self):
"""The id of a node with a matching suffix, representing a suffix link.
e = self.get_edge(edge_id)
if e.length <= suffix.length:
suffix.first_char_index += e.length + 1
suffix.source_node_id = e.dest_node_id
self._canonize_suffix(suffix, string)
class SuffixTreeNode(EventSourcedEntity):
"""A node in the suffix tree.
"""
class Created(EventSourcedEntity.Created): pass
class AttributeChanged(EventSourcedEntity.AttributeChanged): pass
class Discarded(EventSourcedEntity.Discarded): pass
class ChildNodeAdded(DomainEvent): pass
class ChildNodeRemoved(DomainEvent): pass
def __init__(self, suffix_node_id=None, string_id=None, *args, **kwargs):
super(SuffixTreeNode, self).__init__(*args, **kwargs)
self._suffix_node_id = suffix_node_id
self._string_id = string_id
self._child_node_ids = OrderedDict()
@mutableproperty
def suffix_node_id(self):
"""The id of a node with a matching suffix, representing a suffix link.
None indicates this node has no suffix link.
import uuid
from eventsourcing.domain.model.entity import EventSourcedEntity, attribute, EntityRepository, entity_mutator, \
singledispatch, Created, AttributeChanged, Discarded
from eventsourcing.domain.model.events import publish, DomainEvent
class Example(EventSourcedEntity):
"""
An example event sourced domain model entity.
"""
# Make sure events that are applied to the entity have originated
# from the entity at the version the instance it is current at.
# - this assumes _validate_originator() is called in mutators.
__always_validate_originator_version__ = True
class Created(Created):
pass
class AttributeChanged(AttributeChanged):
pass
class Discarded(Discarded):
self._increment_version()
return self
@suffix_tree_node_mutator.register(SuffixTreeNode.ChildNodeRemoved)
def child_node_removed_mutator(event, self):
assert isinstance(self, SuffixTreeNode), self
try:
del(self._child_node_ids[event.child_node_id])
except KeyError:
pass
self._increment_version()
return self
class SuffixTreeEdge(EventSourcedEntity):
"""An edge in the suffix tree.
"""
class Created(EventSourcedEntity.Created):
pass
class AttributeChanged(EventSourcedEntity.AttributeChanged):
pass
class Discarded(EventSourcedEntity.Discarded):
pass
def __init__(self, label, source_node_id, dest_node_id, **kwargs):
super(SuffixTreeEdge, self).__init__(**kwargs)
self._label = label
self._source_node_id = source_node_id
import uuid
from eventsourcing.domain.model.entity import EventSourcedEntity, mutableproperty, EntityRepository, entity_mutator, \
singledispatch
from eventsourcing.domain.model.events import publish, DomainEvent
class Example(EventSourcedEntity):
"""
An example event sourced domain model entity.
"""
# Needed to get an event history longer than 10000 in Cassandra.
__page_size__ = 1000
# Make sure events that are applied to the entity have originated
# from the entity at the version the instance it is current at.
# - this assumes _validate_originator() is called in mutators.
__always_validate_originator_version__ = True
class Created(EventSourcedEntity.Created):
pass
class AttributeChanged(EventSourcedEntity.AttributeChanged):