Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return when.try(self._eventSink.sink.bind(self._eventSink), commitObject).then(function _commitSinkSucceeded(result) {
// Now that the commit is sunk, we can clear the event staging area - new events will end up in subsequent commits.
self._stagedEvents = [];
self._updateSequenceNumber(commitObject.sequenceSlot);
//NOTE: The check/log emission below is a good candidate for refactoring into Aspect-Oriented Programming.
// ESDF Core does not support AOP as of now, though.
if(self._IOObserver){
self._IOObserver.emit('CommitSinkSuccess', {
commitObject: commitObject
});
}
// Now that the commit has been saved, we proceed to save a snapshot if the snapshotting strategy tells us to (and we have a snapshot save provider).
// Note that _snapshotStrategy is called with "this" set to the current aggregate, which makes it behave like a private method.
if (self.supportsSnapshotGeneration() && self._snapshotter && self._snapshotStrategy && self._snapshotStrategy(commitObject)) {
when.try(self._saveSnapshot.bind(self)).catch(function(error) {
//TODO: We should not be using console directly, but there is currently
// no way to inject a custom logger.
console.error('Error saving snapshot for %s [%s]: %s', self._aggregateID, self._aggregateType, error);
});
// Since saving a snapshot is never mandatory for correct operation of an event-sourced application, we do not have to react to errors.
}
return result;
}, function _commitSinkFailed(reason) {
// Sink failed - do nothing. An upper layer can either retry the sinking, or reload the aggregate and retry (in the latter case, the sequence number will probably get refreshed).
readable._read = function _read(length) {
var self = this;
when.try(reader, sequenceID, currentOffset).then(function(commits) {
if (!Array.isArray(commits) && commits !== null) {
throw new Error('Expected an array of Commit objects, but got an unknown type from the commit reader function');
}
// Move forward in the stream:
if (Array.isArray(commits)) {
currentOffset += commits.length;
}
// Only perform one push(). Multiple pushes from one _read() are apparently a sure-fire way to cause a huge memory leak.
self.push(commits);
}).catch(function(error) {
self.emit('error', error);
});
};
}
}).then(function runUserFunction(loadingResult) {
var aggregateInstance = loadingResult.instance;
var stagedCommit;
return when.try(userFunction, aggregateInstance).then(function saveAggregateState(userFunctionResult) {
// Get the events staged by the aggregate root in course of execution and eventually append them to the result if requested.
try {
stagedCommit = aggregateInstance.getCommit(options.commitMetadata || {});
} catch (commitConstructionError) {
// no-op: we've failed to construct the commit; maybe the aggregate
// root instance has no ID assigned? Anyway, commit() will either
// fail or succeed without writing anything (0 events). This is not
// a common case, but a useful one when loading "dummy entities"
// which are guaranteed to be in their initial state. This also
// ensures compatibility with esdf 0.1.x.
}
// Actually commit:
return when.try(aggregateInstance.commit.bind(aggregateInstance), options.commitMetadata || {}).then(function _buildOutput() {
// If the caller has requested an "advanced format" result, pass the data through to them, enriched with the result of the user function.
if (options.advanced) {
DummyEventStore.prototype.streamSequenceCommits = function streamSequenceCommits(sequenceID, since, commitCallback){
return when.try(this._faultInjector.bind(this), 'streamSequenceCommits', sequenceID, since, commitCallback).then((function(){
return when.promise((function retrieveDummyStreamContents(resolve, reject){
// If there is no sequence under this ID, pretend we have one that is empty, instead.
var sequence = (this._sequences[sequenceID] || []).slice(since - 1);
var commitCount = sequence.length;
function processCommit(index){
if(index === commitCount){
resolve();
return;
}
var commit = sequence[index];
when.try(commitCallback, commit).done(processCommit.bind(undefined, index + 1), reject);
}
processCommit(0);
}).bind(this));
}).bind(this));
function singlePass(){
function logCompletion(type, envelope){
// Guard clause: If neither tracing nor logging have been requested, there is no point in generating Occurences.
if(!generateLogs){
return when.resolve(envelope);
}
return when.try(log, Occurence(type, {
envelope: envelope.copy()
})).yield(envelope);
}
return when.try(load)
.then(logCompletion.bind(undefined, 'loaded'))
.then(process)
.then(logCompletion.bind(undefined, 'processed'))
.then(optionalSeal)
.then(save)
.then(logCompletion.bind(undefined, 'saved'));
}
function nominalLoad(){
// Actual retrieval/construction/rehydration:
var ARObject = constructAggregate();
return when.try(snapshotter.loadSnapshot.bind(snapshotter), aggregateType, ARID).then(function _applySnapshot(snapshot){
// A snapshot has been found and loaded, so let the AR apply it to itself, according to its internal logic.
return when.try(ARObject.applySnapshot.bind(ARObject), snapshot);
}, function _snapshotNonexistent(){
// This function intentionally does nothing. It simply turns a rejection from loadSnapshot() into a resolution.
}).then(rehydrateAggregate.bind(undefined, ARObject)).then(function(){
return ARObject;
});
}
function rehydrateAggregate(ARObject){
return when.try(eventSink.rehydrate.bind(eventSink), ARObject, ARID, ARObject.getNextSequenceNumber());
}
C.open = function() {
return when.try(this.allocate.bind(this)).then(
function(ch) {
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
defs.ChannelOpenOk);
});
};
function tryTask() {
return when.try(taskContainer.task).then(function() {
self._queues[resourceID].pendingTasksForResource -= 1;
if (self._queues[resourceID].pendingTasksForResource <= 0) {
self._queues[resourceID].pause();
delete self._queues[resourceID];
}
}).then(taskContainer.fulfill, function(error) {
self._logger(resourceID, error);
if (!self._options.retryDelay) {
return tryTask();
}
else {
return when.resolve().delay(self._options.retryDelay).then(tryTask);
}
});
}
export function collection(username){
return when.try(function(boardgames, expansions){
if(boardgames.entity.message){
return boardgames.entity;
}
if(expansions.entity.message){
return expansions.entity;
}
let games = boardgames.entity.items.item || [];
return games.concat(expansions.entity.items.item || []);
},
client({path: `collection?stats=1&username=${username}&version=1&subtype=boardgame&excludesubtype=boardgameexpansion`}),
client({path: `collection?stats=1&username=${username}&version=1&subtype=boardgameexpansion`}));
}