Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@mutableproperty
def a(self):
"A mutable event sourced property."
# Instantiate the class and check assigning to the property publishes an event and updates the object state.
published_events = []
subscription = (lambda x: True, lambda x: published_events.append(x))
subscribe(*subscription)
entity_id = '1'
try:
aaa = Aaa(entity_id=entity_id, entity_version=0, domain_event_id='0', a=1)
self.assertEqual(aaa.a, 1)
aaa.a = 'value1'
self.assertEqual(aaa.a, 'value1')
finally:
unsubscribe(*subscription)
# Check an event was published.
self.assertEqual(len(published_events), 1)
# Check the published event was an AttributeChanged event, with the expected attribute values.
published_event = published_events[0]
self.assertIsInstance(published_event, Aaa.AttributeChanged)
self.assertEqual(published_event.name, '_a')
self.assertEqual(published_event.value, 'value1')
self.assertTrue(published_event.domain_event_id)
self.assertEqual(published_event.entity_id, entity_id)
def close(self) -> None:
unsubscribe(self.add_ticket, self.is_ticket_created)
unsubscribe(self.update_ticket, self.is_ticket_updated)
unsubscribe(self.delete_ticket, self.is_ticket_deleted)
def close(self):
unsubscribe(self.store_event, self.is_event)
# Subscribe the slave actor's send_prompt() method.
# - the process application will call publish_prompt()
# and the actor will receive the prompt and send it
# as a message.
subscribe(predicate=self.is_my_prompt, handler=self.send_prompt)
# Close the process application persistence policy.
# - slave actor process application doesn't publish
# events, so we don't need this
self.process.persistence_policy.close()
# Unsubscribe process application's publish_prompt().
# - slave actor process application doesn't publish
# events, so we don't need this
unsubscribe(
predicate=self.process.persistence_policy.is_event,
handler=self.process.publish_prompt,
)
# Construct and follow upstream notification logs.
for upstream_application_name in self.upstream_application_names:
record_manager = self.process.event_store.record_manager
# assert isinstance(record_manager, ACIDRecordManager), type(record_manager)
notification_log = RecordManagerNotificationLog(
record_manager=record_manager.clone(
application_name=upstream_application_name,
pipeline_id=self.pipeline_id,
),
section_size=self.process.notification_log_section_size,
)
self.process.follow(upstream_application_name, notification_log)
def close(self) -> None:
unsubscribe(self.add_ticket, self.is_ticket_created)
unsubscribe(self.update_ticket, self.is_ticket_updated)
unsubscribe(self.delete_ticket, self.is_ticket_deleted)
def close(self):
super(MultiprocessRunner, self).close()
unsubscribe(handler=self.broadcast_prompt, predicate=self.is_prompt)
for os_process in self.os_processes:
os_process.inbox.put("QUIT")
for os_process in self.os_processes:
os_process.join(timeout=10)
for os_process in self.os_processes:
os_process.is_alive() and os_process.terminate()
self.os_processes = None
self.manager = None
def close(self):
unsubscribe(predicate=self.is_my_prompt, handler=self.send_prompt)
self.process.close()
# the same commands, and running the same processes. The command logging process could
# be accompanied with a result logging process that reads results from replicas as they
# are available. Not sure what to do if replicas return different things. If one replica
# goes down, then it could resume by pulling events from another? Not sure what to do.
# External systems could be modelled as commands.
# Make the process follow the upstream notification log.
self.process.follow(upstream_name, notification_log)
# Subscribe to broadcast prompts published by the process application.
subscribe(handler=self.broadcast_prompt, predicate=self.is_prompt)
try:
self.loop_on_prompts()
finally:
unsubscribe(handler=self.broadcast_prompt, predicate=self.is_prompt)
def close(self) -> None:
if self._persistence_policy:
unsubscribe(
predicate=self._persistence_policy.is_event, handler=self.publish_prompt
)
super(ProcessApplication, self).close()
def close(self):
"""Stops all the actors running a system of process applications."""
super(ActorModelRunner, self).close()
unsubscribe(handler=self.forward_prompt, predicate=self.is_prompt)
if self.shutdown_on_close:
self.shutdown()