Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
this.name,
this.address,
state
);
}
if (shouldReopen) {
// provide a new name to the link while re-connecting it. This ensures that
// the service does not send an error stating that the link is still open.
const options: ReceiverOptions = this._createReceiverOptions(true);
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig = {
operation: () => this._init(options),
connectionId: connectionId,
operationType: RetryOperationType.receiverLink,
times: Constants.defaultConnectionRetryAttempts,
delayInSeconds: 15
};
await retry(config);
}
} catch (err) {
log.error(
"[%s] An error occurred while processing detached() of Receiver '%s': %O ",
connectionId,
this.name,
this.address,
err
);
}
}await defaultLock.acquire(this.senderLock, () => {
const options: SenderOptions = this._createSenderOptions({
newName: true
});
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig = {
operation: () => this._init(options),
connectionId: this._context.namespace.connectionId!,
operationType: RetryOperationType.senderLink,
times: Constants.defaultConnectionRetryAttempts,
connectionHost: this._context.namespace.config.host,
delayInSeconds: 15
};
return retry(config);
});
}newName: true // provide a new name to the link while re-connecting it. This ensures that
// the service does not send an error stating that the link is still open.
};
// reconnect the receiver link with sequenceNumber of the last received message as the offset
// if messages were received by the receiver before it got disconnected.
if (this._checkpoint.sequenceNumber > -1) {
rcvrOptions.eventPosition = EventPosition.fromSequenceNumber(this._checkpoint.sequenceNumber);
}
const options: ReceiverOptions = this._createReceiverOptions(rcvrOptions);
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig = {
operation: () => this._init(options),
connectionId: this._context.connectionId,
operationType: RetryOperationType.receiverLink,
times: Constants.defaultConnectionRetryAttempts,
connectionHost: this._context.config.host,
delayInSeconds: 15
};
await retry(config);
}
} catch (err) {
log.error(
"[%s] An error occurred while processing detached() of Receiver '%s' with address " + "'%s': %O",
this._context.connectionId,
this.name,
this.address,
err
);
}
}await defaultLock.acquire(this.senderLock, () => {
const options: SenderOptions = this._createSenderOptions({
newName: true
});
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig = {
operation: () => this._init(options),
connectionId: this._context.namespace.connectionId!,
operationType: RetryOperationType.senderLink,
times: Constants.defaultConnectionRetryAttempts,
connectionHost: this._context.namespace.config.host,
delayInSeconds: 15
};
return retry(config);
});
}"[%s] close() method of Receiver '%s' with address '%s' was called. " +
"by the time the receiver finished getting created. Hence, disallowing messages from being received. ",
connectionId,
this.name,
this.address
);
await this.close();
} else {
if (this._receiver && this.receiverType === ReceiverType.streaming) {
this._receiver.addCredit(this.maxConcurrentCalls);
}
}
}),
connectionId: connectionId,
operationType: RetryOperationType.receiverLink,
times: Constants.defaultConnectionRetryAttempts,
connectionHost: this._context.namespace.config.host,
delayInSeconds: 15
};
if (!this.wasCloseInitiated) {
await retry(config);
}
}
} catch (err) {
log.error(
"[%s] An error occurred while processing detached() of Receiver '%s': %O ",
connectionId,
this.name,
this.address,
err
);
}"by the time the receiver finished getting created. Hence, disallowing messages from being received. ",
connectionId,
this.name,
this.address
);
await this.close();
} else {
if (this._receiver && this.receiverType === ReceiverType.streaming) {
this._receiver.addCredit(this.maxConcurrentCalls);
}
}
return;
}),
connectionId: connectionId,
operationType: RetryOperationType.receiverLink,
times: Constants.defaultConnectionRetryAttempts,
connectionHost: this._context.namespace.config.host,
delayInSeconds: 15
};
if (!this.wasCloseInitiated) {
await retry(config);
}
}
} catch (err) {
log.error(
"[%s] An error occurred while processing detached() of Receiver '%s': %O ",
connectionId,
this.name,
this.address,
err
);
if (typeof this._onError === "function") {