Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async detached(receiverError?: AmqpError | Error): Promise {
const connectionId = this._context.namespace.connectionId;
try {
const wasCloseInitiated = this._receiver && this._receiver.isClosed();
// Clears the token renewal timer. Closes the link and its session if they are open.
// Removes the link and its session if they are present in rhea's cache.
await this._closeLink(this._receiver);
// For session_close and receiver_close this should attempt to reopen
// only when the receiver(sdk) did not initiate the close) OR
// if an error is present and the error is retryable.
let shouldReopen = false;
if (receiverError && !wasCloseInitiated) {
const translatedError = translate(receiverError);
if (translatedError.retryable) {
shouldReopen = true;
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " +
"was an accompanying error and it is retryable. This is a candidate for re-establishing " +
"the receiver link.",
connectionId,
this.name,
this.address
);
} else {
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " +
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " +
"the receiver link.",
connectionId,async detached(receiverError?: AmqpError | Error): Promise {
try {
const wasCloseInitiated = this._receiver && this._receiver.isItselfClosed();
// Clears the token renewal timer. Closes the link and its session if they are open.
// Removes the link and its session if they are present in rhea's cache.
await this._closeLink(this._receiver);
// We should attempt to reopen only when the receiver(sdk) did not initiate the close
let shouldReopen = false;
if (receiverError && !wasCloseInitiated) {
const translatedError = translate(receiverError);
if (translatedError.retryable) {
shouldReopen = true;
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " +
"was an accompanying error and it is retryable. This is a candidate for re-establishing " +
"the receiver link.",
this._context.connectionId,
this.name,
this.address
);
} else {
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " +
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " +
"the receiver link.",
this._context.connectionId,if (this.receiverType === ReceiverType.batching) {
log.error(
"[%s] Receiver '%s' with address '%s' is a Batching Receiver, so we will not be " +
"re-establishing the receiver link.",
connectionId,
this.name,
this.address
);
return;
}
// We should attempt to reopen only when the receiver(sdk) did not initiate the close
let shouldReopen = false;
if (receiverError && !wasCloseInitiated) {
const translatedError = translate(receiverError);
if (translatedError.retryable) {
shouldReopen = true;
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " +
"was an accompanying error and it is retryable. This is a candidate for re-establishing " +
"the receiver link.",
connectionId,
this.name,
this.address
);
} else {
log.error(
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " +
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " +
"the receiver link.",
connectionId,// completing the message.
if (
this.autoComplete &&
this.receiveMode === ReceiveMode.peekLock &&
!bMessage.delivery.remote_settled
) {
try {
log[this.receiverType](
"[%s] Auto completing the message with id '%s' on " + "the receiver '%s'.",
connectionId,
bMessage.messageId,
this.name
);
await bMessage.complete();
} catch (completeError) {
const translatedError = translate(completeError);
log.error(
"[%s] An error occurred while completing the message with id '%s' on the " +
"receiver '%s': %O.",
connectionId,
bMessage.messageId,
this.name,
translatedError
);
this._onError!(translatedError);
}
}
};log.mgmt(
"[%s] Add Rule request body: %O.",
this._context.namespace.connectionId,
request.body
);
log.mgmt(
"[%s] Acquiring lock to get the management req res link.",
this._context.namespace.connectionId
);
await defaultLock.acquire(this.managementLock, () => {
return this._init();
});
await this._mgmtReqResLink!.sendRequest(request);
} catch (err) {
const error = translate(err);
log.error(
"An error occurred while sending the Add rule request to $management " + "endpoint: %O",
error
);
throw error;
}
}
}}
await this._ensureTokenRenewal();
} else {
log.error(
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +
"-> %s. Hence not reconnecting.",
connectionId,
this.name,
this.address,
this.isOpen(),
this.isConnecting
);
}
} catch (err) {
this.isConnecting = false;
err = translate(err);
log.error(
"[%s] An error occured while creating the receiver '%s': %O",
this._context.namespace.connectionId,
this.name,
err
);
throw err;
}
}this._onSessionError = (context: EventContext) => {
const receiver = this._receiver || context.receiver!;
const sessionError = context.session && context.session.error;
if (sessionError) {
const ehError = translate(sessionError);
log.error(
"[%s] An error occurred on the session for Receiver '%s': %O.",
this._context.connectionId,
this.name,
ehError
);
if (receiver && !receiver.isSessionItselfClosed() && !ehError.retryable) {
log.error(
"[%s] Since the user did not close the receiver and the session error is not " +
"retryable, we let the user know about it by calling the user's error handler.",
this._context.connectionId
);
this._onError!(ehError);
}
}
};this._onAmqpError = (context: EventContext) => {
const connectionId = this._context.namespace.connectionId;
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
const sbError = translate(receiverError);
if (sbError.name === "SessionLockLostError") {
this._context.expiredMessageSessions[this.sessionId!] = true;
sbError.message = `The session lock has expired on the session with id ${
this.sessionId
}.`;
}
log.error(
"[%s] An error occurred for Receiver '%s': %O.",
connectionId,
this.name,
sbError
);
this._notifyError(sbError);
}
};this._onSessionError = (context: EventContext) => {
const connectionId = this._context.namespace.connectionId;
const receiver = this._receiver || context.receiver!;
const sessionError = context.session && context.session.error;
if (sessionError) {
const sbError = translate(sessionError);
log.error(
"[%s] An error occurred on the session for Receiver '%s': %O.",
connectionId,
this.name,
sbError
);
if (receiver && !receiver.isSessionItselfClosed() && !sbError.retryable) {
log.error(
"[%s] Since the user did not close the receiver and the session error is not " +
"retryable, we let the user know about it by calling the user's error handler.",
connectionId
);
this._onError!(sbError);
}
}
if (this._newMessageReceivedTimer) {this._onSessionError = (context: EventContext) => {
const connectionId = this._context.namespace.connectionId;
const receiver = this._receiver || context.receiver!;
const sessionError = context.session && context.session.error;
if (sessionError) {
const sbError = translate(sessionError);
log.error(
"[%s] An error occurred on the session for Receiver '%s': %O.",
connectionId,
this.name,
sbError
);
if (receiver && !receiver.isSessionItselfClosed() && !sbError.retryable) {
log.error(
"[%s] Since the user did not close the receiver and the session error is not " +
"retryable, we let the user know about it by calling the user's error handler.",
connectionId
);
this._onError!(sbError);
}
}
if (this._newMessageReceivedTimer) {