Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def construct_event_store(self, event_sequence_id_attr, event_position_attr, record_manager,
always_encrypt=False, cipher=None):
sequenced_item_mapper = self.construct_sequenced_item_mapper(
sequenced_item_class=record_manager.sequenced_item_class,
event_sequence_id_attr=event_sequence_id_attr,
event_position_attr=event_position_attr,
always_encrypt=always_encrypt,
cipher=cipher,
)
event_store = EventStore(
record_manager=record_manager,
sequenced_item_mapper=sequenced_item_mapper,
)
return event_store
def construct_event_store(
self,
sequenced_item_mapper_class,
event_sequence_id_attr,
event_position_attr,
record_manager,
cipher=None,
):
sequenced_item_mapper = self.construct_sequenced_item_mapper(
sequenced_item_mapper_class=sequenced_item_mapper_class,
sequenced_item_class=record_manager.sequenced_item_class,
event_sequence_id_attr=event_sequence_id_attr,
event_position_attr=event_position_attr,
cipher=cipher,
)
event_store = EventStore(
record_manager=record_manager, sequenced_item_mapper=sequenced_item_mapper
)
return event_store
def construct_event_store(self):
super(SnapshottingApplication, self).construct_event_store()
# Setup event store for snapshots.
self.snapshot_store = EventStore(
record_manager=self.infrastructure_factory.construct_snapshot_record_manager(),
sequenced_item_mapper=self.sequenced_item_mapper_class(
sequenced_item_class=self.sequenced_item_class
),
record_manager_class: Optional[Type[AbstractRecordManager]] = None
stored_event_record_class: Optional[type] = None
snapshot_record_class: Optional[type] = None
sequenced_item_class: Optional[Type[NamedTuple]] = None
sequenced_item_mapper_class: Optional[Type[SequencedItemMapper]] = None
compressor: Any = None
json_encoder_class: Optional[Type[JSONEncoder]] = None
json_decoder_class: Optional[Type[JSONDecoder]] = None
persist_event_type: Optional[PersistEventType] = None
notification_log_section_size: Optional[int] = None
use_cache: bool = False
event_store_class: Type[EventStore] = EventStore
repository_class: Type[EventSourcedRepository] = EventSourcedRepository
def __init__(
self,
name: str = "",
persistence_policy: Optional[PersistencePolicy] = None,
persist_event_type: PersistEventType = None,
cipher_key: Optional[str] = None,
compressor: Any = None,
sequenced_item_class: Optional[Type[NamedTuple]] = None,
sequenced_item_mapper_class: Optional[Type[SequencedItemMapper]] = None,
record_manager_class: Optional[Type[AbstractRecordManager]] = None,
stored_event_record_class: Optional[type] = None,
event_store_class: Optional[Type[EventStore]] = None,
snapshot_record_class: Optional[type] = None,
setup_table: bool = True,
def __init__(self, snapshot_store):
assert isinstance(snapshot_store, EventStore)
self.snapshot_store = snapshot_store
sequence_id_attr_name=sequence_id_attr_name,
position_attr_name=position_attr_name,
json_encoder_class=json_encoder_class,
json_decoder_class=json_decoder_class,
cipher=cipher,
)
factory = SQLAlchemyInfrastructureFactory(
session=session,
integer_sequenced_record_class=record_class or StoredEventRecord,
sequenced_item_class=sequenced_item_class,
contiguous_record_ids=contiguous_record_ids,
application_name=application_name,
pipeline_id=pipeline_id,
)
record_manager = factory.construct_integer_sequenced_record_manager()
event_store = EventStore[DomainEvent, AbstractRecordManager](
record_manager=record_manager, event_mapper=sequenced_item_mapper
)
return event_store