Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async onDetached(senderError?: AmqpError | Error): Promise {
try {
const wasCloseInitiated = this._sender && this._sender.isItselfClosed();
// Clears the token renewal timer. Closes the link and its session if they are open.
// Removes the link and its session if they are present in rhea's cache.
await this._closeLink(this._sender);
// We should attempt to reopen only when the sender(sdk) did not initiate the close
let shouldReopen = false;
if (senderError && !wasCloseInitiated) {
const translatedError = translate(senderError);
if (translatedError.retryable) {
shouldReopen = true;
logger.verbose(
"[%s] close() method of Sender '%s' with address '%s' was not called. There " +
"was an accompanying error an it is retryable. This is a candidate for re-establishing " +
"the sender link.",
this._context.connectionId,
this.name,
this.address
);
} else {
logger.verbose(
"[%s] close() method of Sender '%s' with address '%s' was not called. There " +
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " +
"the sender link.",
this._context.connectionId,
logErrorStackTrace(err);
return reject(err);
} finally {
removeListeners();
}
} else {
// let us retry to send the message after some time.
const msg =
`[${this._context.connectionId}] Sender "${this.name}", ` +
`cannot send the message right now. Please try later.`;
logger.warning(msg);
const amqpError: AmqpError = {
condition: ErrorNameConditionMapper.SenderBusyError,
description: msg
};
reject(translate(amqpError));
}
});
logger.verbose(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
try {
const senderOptions = this._createSenderOptions(
getRetryAttemptTimeoutInMs(options.retryOptions)
);
await defaultLock.acquire(this.senderLock, () => {
return this._init(senderOptions);
});
} catch (err) {
removeListeners();
err = translate(err);
logger.warning(
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name,
err
);
logErrorStackTrace(err);
return reject(err);
}
}
logger.verbose(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
logger.verbose(
"[%s] An error occurred on the $management receiver link.. %O",
id,
ehError
);
});
logger.verbose(
"[%s] Created sender '%s' and receiver '%s' links for $management endpoint.",
this._context.connectionId,
this._mgmtReqResLink.sender.name,
this._mgmtReqResLink.receiver.name
);
await this._ensureTokenRenewal();
}
} catch (err) {
err = translate(err);
logger.warning(
"[%s] An error occured while establishing the $management links: %O",
this._context.connectionId,
err
);
logErrorStackTrace(err);
throw err;
}
}
const amqpError = rheaReceiver.error;
if (!amqpError) {
return;
}
if (rheaReceiver.isItselfClosed()) {
logger.verbose(
"[%s] The receiver was closed by the user." +
"Hence not notifying the user's error handler.",
this._context.connectionId
);
return;
}
if (this._onError) {
const error = translate(amqpError);
logger.warning(
"[%s] An error occurred for Receiver '%s': %O.",
this._context.connectionId,
this.name,
error
);
logErrorStackTrace(error);
logger.verbose(
"[%s] Since the user did not close the receiver " +
"we let the user know about it by calling the user's error handler.",
this._context.connectionId
);
this._onError(error);
}
}
if (!this._context.senders[this.name]) this._context.senders[this.name] = this;
await this._ensureTokenRenewal();
} else {
logger.verbose(
"[%s] The sender '%s' with address '%s' is open -> %s and is connecting " +
"-> %s. Hence not reconnecting.",
this._context.connectionId,
this.name,
this.address,
this.isOpen(),
this.isConnecting
);
}
} catch (err) {
this.isConnecting = false;
err = translate(err);
logger.warning(
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name,
err
);
logErrorStackTrace(err);
throw err;
}
}
const actionAfterTimeout = () => {
const desc: string = `The request with message_id "${request.message_id}" timed out. Please try again later.`;
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
return reject(translate(e));
};
);
logErrorStackTrace(err);
reject(err);
}
});
const config: RetryConfig = {
operation: sendOperationPromise,
connectionId: this._context.connectionId,
operationType: RetryOperationType.management,
abortSignal: abortSignal,
retryOptions: retryOptions
};
return (await retry(config)).body;
} catch (err) {
err = translate(err);
logger.warning("An error occurred while making the request to $management endpoint: %O", err);
logErrorStackTrace(err);
throw err;
}
}
this._mgmtReqResLink.sender.on(SenderEvents.senderError, (context: EventContext) => {
const id = context.connection.options.id;
const ehError = translate(context.sender!.error!);
logger.verbose("[%s] An error occurred on the $management sender link.. %O", id, ehError);
});
this._mgmtReqResLink.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {