Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
it("throws when token is canceled", async () => {
await expect(new AsyncMutex().lock(Cancelable.canceled)).rejects.toThrow(CancelError);
});
});
private async _lockWriter(handle: LockHandle, upgradeable: LockHandle | undefined, cancelable?: Cancelable) {
Cancelable.throwIfSignaled(cancelable);
while (true) {
if (this._writer === handle) {
throw new Error("Lock already taken.");
}
if (this._upgradeable !== upgradeable) {
throw new Error("Lock already released.");
}
if (this._canTakeWriteLock()) {
this._writer = handle;
return;
}
await this._writerQueue.wait(cancelable);
}
}
async wait(cancelable?: Cancelable): Promise {
Cancelable.throwIfSignaled(cancelable);
if (this._currentCount > 0) {
this._currentCount--;
return;
}
await this._waiters.wait(cancelable);
}
async wait(cancelable?: Cancelable): Promise {
Cancelable.throwIfSignaled(cancelable);
if (this._signaled) {
this._signaled = false;
return;
}
await this._waiters.wait(cancelable);
}
}
async wait(cancelable?: Cancelable): Promise {
Cancelable.throwIfSignaled(cancelable);
if (this._signaled) {
return;
}
await this._waiters.wait(cancelable);
}
}
async signalAndWait(cancelable?: Cancelable): Promise {
Cancelable.throwIfSignaled(cancelable);
if (this._isExecutingPostPhaseAction) throw new Error("This method may not be called from within the postPhaseAction.");
if (this._participantCount === 0) throw new Error("The barrier has no registered participants.");
if (this._remainingParticipants === 0) throw new Error("The number of operations using the barrier exceeded the number of registered participants.");
const waiter = this._waiters.wait(cancelable);
this._remainingParticipants--;
if (this._remainingParticipants === 0) {
this._finishPhase();
}
await waiter;
}
private async _lockReader(handle: LockHandle, upgradeable: boolean, cancelable?: Cancelable) {
Cancelable.throwIfSignaled(cancelable);
while (true) {
if (this._readers.has(handle)) {
throw new Error("Lock already taken.");
}
if (this._canTakeReadLock() && !(upgradeable && this._upgradeable)) {
this._readers.add(handle);
if (upgradeable) {
this._upgradeable = handle;
}
return;
}
await this._readerQueue.wait(cancelable);
}
}
private async _lock(handle: LockHandle, cancelable?: Cancelable) {
Cancelable.throwIfSignaled(cancelable);
if (this._handle === handle) {
throw new Error("Lock already taken.");
}
if (this._handle) {
await this._waiters.wait(cancelable);
if (this._handle === handle) {
throw new Error("Lock already taken.");
}
}
this._handle = handle;
}
return new Promise((resolve, reject) => {
let msec: number;
if (typeof cancelable === "number") {
value = _msec as T | PromiseLike;
msec = cancelable;
cancelable = Cancelable.none;
}
else {
msec = _msec as number;
}
Cancelable.throwIfSignaled(cancelable);
const handle = setTimeout(() => {
subscription.unsubscribe();
resolve(value);
}, msec);
const subscription = Cancelable.subscribe(cancelable, () => {
clearTimeout(handle);
reject(new CancelError());
});
});
}
function createCancelSubscription(node: LinkedListNode<() => void>): CancelSubscription {
return CancelSubscription.create(() => {
if (node.detachSelf()) {
node.value = undefined!;
}
});
}