Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from eventsourcing.example.domainmodel import AbstractExampleRepository
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
class ExampleRepository(EventSourcedRepository, AbstractExampleRepository):
"""
Event sourced repository for the Example domain model entity.
"""
__page_size__ = 1000
self.cipher = self.construct_cipher(cipher_key)
self.compressor = compressor or type(self).compressor
# Default to using zlib compression when encrypting.
if self.cipher and self.compressor is None:
self.compressor = zlib
self.infrastructure_factory: Optional[
InfrastructureFactory[TVersionedEvent]
] = None
self._datastore: Optional[AbstractDatastore] = None
self._event_store: Optional[
AbstractEventStore[TVersionedEvent, BaseRecordManager]
] = None
self._repository: Optional[
EventSourcedRepository[TVersionedEntity, TVersionedEvent]
] = None
self._notification_log: Optional[LocalNotificationLog] = None
self.use_cache = use_cache or type(self).use_cache
if (
self.record_manager_class
or self.infrastructure_factory_class.record_manager_class
):
self.construct_infrastructure()
if setup_table:
self.setup_table()
self.construct_notification_log()
def __init__(
self, event_store: AbstractEventStore, use_cache: bool = False, **kwargs: Any
):
super(EventSourcedRepository, self).__init__(event_store, **kwargs)
# NB If you use the cache, make sure to del entities
# when records fail to write otherwise the cache will
# give an entity that is ahead of the event records,
# and writing more records will give a broken sequence.
self._cache: Dict[UUID, Optional[TVersionedEntity]] = {}
self._use_cache = use_cache
from typing import Any
from eventsourcing.domain.model.array import (
AbstractArrayRepository,
AbstractBigArrayRepository,
)
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
class ArrayRepository(AbstractArrayRepository, EventSourcedRepository):
pass
class BigArrayRepository(AbstractBigArrayRepository, EventSourcedRepository):
subrepo_class = ArrayRepository
def __init__(self, array_size: int = 10000, *args: Any, **kwargs: Any):
super(BigArrayRepository, self).__init__(*args, **kwargs)
self._subrepo = self.subrepo_class(
event_store=self.event_store, array_size=array_size
)
@property
def subrepo(self) -> ArrayRepository:
return self._subrepo
from eventsourcing.domain.model.collection import (
AbstractCollectionRepository,
Collection,
)
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
class CollectionRepository(EventSourcedRepository, AbstractCollectionRepository):
"""
Event sourced repository for the Collection domain model entity.
from eventsourcing.domain.model.timebucketedlog import (
Timebucketedlog,
TimebucketedlogRepository,
)
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
class TimebucketedlogRepo(EventSourcedRepository, TimebucketedlogRepository):
"""
Event sourced repository for the Example domain model entity.
def find_substring(substring, suffix_tree, edge_repo):
"""Returns the index if substring in tree, otherwise -1.
"""
assert isinstance(substring, str)
assert isinstance(suffix_tree, SuffixTree)
assert isinstance(edge_repo, EventSourcedRepository)
if not substring:
return -1
if suffix_tree.case_insensitive:
substring = substring.lower()
curr_node_id = suffix_tree.root_node_id
i = 0
while i < len(substring):
edge_id = make_edge_id(curr_node_id, substring[i])
try:
edge = edge_repo[edge_id]
except RepositoryKeyError:
return -1
ln = min(edge.length + 1, len(substring) - i)
if (
substring[i : i + ln]
!= suffix_tree.string[edge.first_char_index : edge.first_char_index + ln]
from eventsourcing.domain.model.notificationlog import NotificationLog, NotificationLogRepository
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
class NotificationLogRepo(EventSourcedRepository, NotificationLogRepository):
"""
Event sourced repository for the Example domain model entity.
"""
domain_class = NotificationLog
record_manager_class: Optional[Type[AbstractRecordManager]] = None
stored_event_record_class: Optional[type] = None
snapshot_record_class: Optional[type] = None
sequenced_item_class: Optional[Type[NamedTuple]] = None
sequenced_item_mapper_class: Optional[Type[SequencedItemMapper]] = None
compressor: Any = None
json_encoder_class: Optional[Type[JSONEncoder]] = None
json_decoder_class: Optional[Type[JSONDecoder]] = None
persist_event_type: Optional[PersistEventType] = None
notification_log_section_size: Optional[int] = None
use_cache: bool = False
event_store_class: Type[EventStore] = EventStore
repository_class: Type[EventSourcedRepository] = EventSourcedRepository
def __init__(
self,
name: str = "",
persistence_policy: Optional[PersistencePolicy] = None,
persist_event_type: PersistEventType = None,
cipher_key: Optional[str] = None,
compressor: Any = None,
sequenced_item_class: Optional[Type[NamedTuple]] = None,
sequenced_item_mapper_class: Optional[Type[SequencedItemMapper]] = None,
record_manager_class: Optional[Type[AbstractRecordManager]] = None,
stored_event_record_class: Optional[type] = None,
event_store_class: Optional[Type[EventStore]] = None,
snapshot_record_class: Optional[type] = None,
setup_table: bool = True,
contiguous_record_ids: bool = True,
from eventsourcing.domain.model.sequence import AbstractCompoundSequenceRepository, AbstractSequenceRepository, \
CompoundSequence, CompoundSequenceMeta, Sequence, SequenceMeta
from eventsourcing.infrastructure.eventsourcedrepository import EventSourcedRepository
from eventsourcing.infrastructure.sequencereader import SequenceReader
class SequenceRepository(EventSourcedRepository, AbstractSequenceRepository):
mutator = SequenceMeta._mutate
def __getitem__(self, sequence_id):
"""
Returns sequence for given ID.
"""
return Sequence(sequence_id=sequence_id, repo=self)
def get_entity(self, entity_id, lt=None, lte=None):
"""
Replays entity using only the 'Started' event.
:rtype: SequenceMeta
"""
return self.event_player.replay_entity(entity_id, limit=1)