Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(self):
# Setup the example application, use it as a context manager.
with ExampleApplicationWithSQLAlchemy(db_uri='sqlite:///:memory:') as app:
# Check there's a DB session.
self.assertIsInstance(app.db_session, ScopedSession)
# Check there's a stored event repo.
self.assertIsInstance(app.stored_event_repo, StoredEventRepository)
self.assertEqual(app.stored_event_repo.db_session, app.db_session)
# Check there's an event store.
self.assertIsInstance(app.event_store, EventStore)
self.assertEqual(app.event_store.stored_event_repo, app.stored_event_repo)
# Check there's a persistence subscriber.
self.assertIsInstance(app.persistence_subscriber, PersistenceSubscriber)
self.assertEqual(app.persistence_subscriber.event_store, app.event_store)
# Check there's an example repository.
self.assertIsInstance(app.example_repo, ExampleRepository)
assert isinstance(app, ExampleApplicationWithSQLAlchemy) # For PyCharm...
# Register a new example.
example1 = app.register_new_example(a=10, b=20)
self.assertIsInstance(example1, Example)
def test(self):
"""Checks self.app both is and work in the way an
instance of a subclass of ExampleApplication should do.
"""
# Check we're dealing with an example application.
assert isinstance(self.app, ExampleApplication)
# Check there's a stored event repo.
self.assertIsInstance(self.app.stored_event_repo, StoredEventRepository)
# Check there's an event store.
self.assertIsInstance(self.app.event_store, EventStore)
self.assertEqual(self.app.event_store.stored_event_repo, self.app.stored_event_repo)
# Check there's a persistence subscriber.
self.assertIsInstance(self.app.persistence_subscriber, PersistenceSubscriber)
self.assertEqual(self.app.persistence_subscriber.event_store, self.app.event_store)
# Check there's an example repository.
self.assertIsInstance(self.app.example_repo, ExampleRepo)
# Register a new example.
example1 = self.app.register_new_example(a=10, b=20)
self.assertIsInstance(example1, Example)
# Check the example is available in the repo.
entity1 = self.app.example_repo[example1.id]
self.assertEqual(10, entity1.a)
def setUp(self):
# Setup the persistence subscriber.
self.event_store = EventStore(PythonObjectsStoredEventRepository())
self.persistence_subscriber = PersistenceSubscriber(event_store=self.event_store)
def test_get_entity_events(self):
repo = PythonObjectsStoredEventRepository()
event_store = EventStore(stored_event_repo=repo)
# Check there are zero stored events in the repo.
entity_events = event_store.get_entity_events(stored_entity_id='Example::entity1')
entity_events = list(entity_events)
self.assertEqual(0, len(entity_events))
# Store a domain event.
event1 = Example.Created(entity_id='entity1', a=1, b=2)
event_store.append(event1)
# Check there is one event in the event store.
entity_events = event_store.get_entity_events(stored_entity_id='Example::entity1')
entity_events = list(entity_events)
self.assertEqual(1, len(entity_events))
# Store another domain event.
def test_snapshots(self):
stored_event_repo = PythonObjectsStoredEventRepository()
event_store = EventStore(stored_event_repo)
self.ps = PersistenceSubscriber(event_store)
event_player = EventPlayer(event_store=event_store, id_prefix='Example', mutate_func=Example.mutate)
# Create a new entity.
registered_example = register_new_example(a=123, b=234)
# Take a snapshot.
snapshot = take_snapshot(registered_example, uuid1().hex)
# Replay from this snapshot.
after = snapshot.at_event_id
initial_state = entity_from_snapshot(snapshot)
retrieved_example = event_player.replay_events(registered_example.id, initial_state=initial_state, after=after)
# Check the attributes are correct.
self.assertEqual(retrieved_example.a, 123)
def test_get_item(self):
# Setup an event store.
stored_event_repo = PythonObjectsStoredEventRepository()
event_store = EventStore(stored_event_repo=stored_event_repo)
# Put an event in the event store.
entity_id = 'entity1'
event_store.append(Example.Created(entity_id=entity_id, a=1, b=2))
# Setup an example repository.
example_repo = ExampleRepo(event_store=event_store)
# Check the repo has the example.
self.assertIn(entity_id, example_repo)
self.assertNotIn('xxxxxxxx', example_repo)
# Check the entity attributes.
example = example_repo[entity_id]
self.assertEqual(1, example.a)
self.assertEqual(2, example.b)
def test_get_entity(self):
# Setup an event store, using Python objects.
event_store = EventStore(stored_event_repo=PythonObjectsStoredEventRepository())
# Store example events.
event1 = Example.Created(entity_id='entity1', a=1, b=2)
event_store.append(event1)
event2 = Example.Created(entity_id='entity2', a=2, b=4)
event_store.append(event2)
event3 = Example.Created(entity_id='entity3', a=3, b=6)
event_store.append(event3)
event4 = Example.Discarded(entity_id='entity3', entity_version=1)
event_store.append(event4)
# Check the event sourced entities are correct.
# - just use a trivial mutate that always instantiates the 'Example'.
event_player = EventPlayer(event_store=event_store, id_prefix='Example', mutate_func=Example.mutate)
# The the reconstituted entity has correct attribute values.
def get_snapshot(stored_entity_id, event_store, until=None):
"""
Get the last snapshot for entity.
:rtype: Snapshot
"""
assert isinstance(event_store, EventStore)
snapshot_entity_id = make_stored_entity_id(id_prefix_from_event_class(Snapshot), stored_entity_id)
return event_store.get_most_recent_event(snapshot_entity_id, until=until)