Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ownershipRequest = this._createPartitionOwnershipRequest(
partitionOwnershipMap,
partitionToClaim
);
}
await this._claimOwnership(ownershipRequest);
}
}
}
// sleep
logger.verbose(
`[${this._id}] Pausing the EventProcessor loop for ${this._loopIntervalInMs} ms.`
);
await delay(this._loopIntervalInMs, abortSignal);
} catch (err) {
logger.warning(`[${this._id}] An error occured within the EventProcessor loop: ${err}`);
logErrorStackTrace(err);
await this._handleSubscriptionError(err);
}
}
}
private async _useInternalQueue(
onMessage: OnMessage,
abortSignal?: AbortSignalLike
): Promise {
let processedMessagesCount = 0;
// allow the event loop to process any blocking code outside
// this code path before sending any events.
await delay(0);
this._usingInternalQueue = true;
while (this._internalQueue.length) {
if (!this._onMessage) {
break;
}
if (abortSignal && abortSignal.aborted) {
break;
}
// These will not be equal if clearHandlers and registerHandlers were called
// in the same tick of the event loop. If onMessage isn't the currently active
// handler, it should stop getting messages from the queue.
if (this._onMessage !== onMessage) {
break;
}
private async _collectExpiredEntries(): Promise {
if (this._map.size === 0) {
return;
}
await delay(this._delayBetweenCleanupInSeconds);
this._cleanupScheduled = false;
for (const key of this._map.keys()) {
if (Date.now() > this._map.get(key)!.getTime()) {
this._map.delete(key);
log.map("Deleted the key '%s' from the map.", key);
}
}
this._scheduleCleanup().catch((err) => {
log.error(
"An error occurred while scheduling the cleanup, after " + "collecting expired entries: %O",
err
);
});
}