Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def register_new_edge(edge_id, label, first_char_index, last_char_index, source_node_id, dest_node_id):
"""Factory method, registers new edge.
"""
event = Edge.Created(
entity_id=edge_id,
label=label,
first_char_index=first_char_index,
last_char_index=last_char_index,
source_node_id=source_node_id,
dest_node_id=dest_node_id,
)
entity = Edge.mutate(event=event)
publish(event)
return entity
def register_new_suffix_tree(string, case_insensitive=False):
"""Factory method, returns new suffix tree object.
"""
suffix_tree_id = uuid4().hex
event = SuffixTree.Created(
entity_id=suffix_tree_id,
string=string,
case_insensitive=case_insensitive,
)
entity = SuffixTree.mutate(event=event)
publish(event)
return entity
def register_new_node(suffix_node_id=None):
"""Factory method, registers new node.
"""
node_id = uuid4()
event = Node.Created(originator_id=node_id, suffix_node_id=suffix_node_id)
entity = Node.mutate(event=event)
publish([event])
return entity
def register_new_edge(edge_id, first_char_index, last_char_index, source_node_id, dest_node_id):
"""Factory method, registers new edge.
"""
event = Edge.Created(
entity_id=edge_id,
first_char_index=first_char_index,
last_char_index=last_char_index,
source_node_id=source_node_id,
dest_node_id=dest_node_id,
)
entity = Edge.mutate(event=event)
publish(event)
return entity
def register_sequence(self, sequence_id):
"""
Factory for Sequence objects.
:rtype: SequenceMeta
"""
event = SequenceMeta.Started(originator_id=sequence_id)
entity = SequenceMeta._mutate(initial=None, event=event)
publish(event)
self._meta = entity
return entity
def publish_prompt(self, _: Optional[IterableOfEvents] = None) -> None:
"""
Publishes prompt for a given event.
Used to prompt downstream process application when an event
is published by this application's model, which can happen
when application command methods, rather than the process policy,
are called.
Wraps exceptions with PromptFailed, to avoid application policy exceptions being
seen directly in other applications when running synchronously in single thread.
"""
prompt = PromptToPull(self.name, self.pipeline_id)
try:
publish([prompt])
except PromptFailed:
raise
except Exception as e:
raise PromptFailed("{}: {}".format(type(e), str(e)))
def discard(self):
self._assert_not_discarded()
event_class = getattr(self, 'Discarded', Discarded)
event = event_class(entity_id=self._id, entity_version=self._version)
self._apply(event)
publish(event)
def __publish_to_subscribers__(self, event):
"""
Actually dispatches given event to publish-subscribe mechanism.
:param event: domain event or list of events
"""
publish(event)
def beat_heart(self):
self._assert_not_discarded()
event = self.Heartbeat(entity_id=self._id, entity_version=self._version)
self._apply(event)
publish(event)