Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
protected async _negotiateClaim(setTokenRenewal?: boolean): Promise {
// Acquire the lock and establish a cbs session if it does not exist on the connection.
// Although node.js is single threaded, we need a locking mechanism to ensure that a
// race condition does not happen while creating a shared resource (in this case the
// cbs session, since we want to have exactly 1 cbs session per connection).
log.link(
"[%s] Acquiring cbs lock: '%s' for creating the cbs session while creating the %s: " +
"'%s' with address: '%s'.",
this._context.namespace.connectionId,
this._context.namespace.cbsSession.cbsLock,
this._type,
this.name,
this.address
);
await defaultLock.acquire(this._context.namespace.cbsSession.cbsLock, () => {
return this._context.namespace.cbsSession.init();
});
let tokenObject: AccessToken;
let tokenType: TokenType;
if (this._context.namespace.tokenCredential instanceof SharedKeyCredential) {
tokenObject = this._context.namespace.tokenCredential.getToken(this.audience);
tokenType = TokenType.CbsTokenTypeSas;
// renew sas token in every 45 minutess
this._tokenTimeout = (3600 - 900) * 1000;
} else {
const aadToken = await this._context.namespace.tokenCredential.getToken(Constants.aadServiceBusScope);
if (!aadToken) {
throw new Error(`Failed to get token from the provided "TokenCredential" object`);
}
tokenObject = aadToken;
protected async _negotiateClaim(setTokenRenewal?: boolean): Promise {
// Acquire the lock and establish a cbs session if it does not exist on the connection.
// Although node.js is single threaded, we need a locking mechanism to ensure that a
// race condition does not happen while creating a shared resource (in this case the
// cbs session, since we want to have exactly 1 cbs session per connection).
logger.verbose(
"[%s] Acquiring cbs lock: '%s' for creating the cbs session while creating the %s: " +
"'%s' with address: '%s'.",
this._context.connectionId,
this._context.cbsSession.cbsLock,
this._type,
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, () => {
return this._context.cbsSession.init();
});
let tokenObject: AccessToken;
let tokenType: TokenType;
if (this._context.tokenCredential instanceof SharedKeyCredential) {
tokenObject = this._context.tokenCredential.getToken(this.audience);
tokenType = TokenType.CbsTokenTypeSas;
// renew sas token in every 45 minutess
this._tokenTimeoutInMs = (3600 - 900) * 1000;
} else {
const aadToken = await this._context.tokenCredential.getToken(Constants.aadEventHubsScope);
if (!aadToken) {
throw new Error(`Failed to get token from the provided "TokenCredential" object`);
}
tokenObject = aadToken;
tokenType = TokenType.CbsTokenTypeJwt;
} else {
const state: any = {
wasCloseInitiated: wasCloseInitiated,
senderError: senderError,
_sender: this._sender
};
logger.verbose(
"[%s] Something went wrong. State of sender '%s' with address '%s' is: %O",
this._context.connectionId,
this.name,
this.address,
state
);
}
if (shouldReopen) {
await defaultLock.acquire(this.senderLock, () => {
const options: AwaitableSenderOptions = this._createSenderOptions(
Constants.defaultOperationTimeoutInMs,
true
);
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig = {
operation: () => this._init(options),
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
connectionHost: this._context.config.host,
retryOptions: {
maxRetries: Constants.defaultMaxRetriesForConnection,
retryDelayInMs: 15000
}
};
logger.verbose(
"[%s] %s: calling negotiateClaim for audience '%s'.",
this._context.connectionId,
this._type,
this.audience
);
// Acquire the lock to negotiate the CBS claim.
logger.verbose(
"[%s] Acquiring cbs lock: '%s' for cbs auth for %s: '%s' with address '%s'.",
this._context.connectionId,
this._context.negotiateClaimLock,
this._type,
this.name,
this.address
);
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject, tokenType);
});
logger.verbose(
"[%s] Negotiated claim for %s '%s' with with address: %s",
this._context.connectionId,
this._type,
this.name,
this.address
);
if (setTokenRenewal) {
await this._ensureTokenRenewal();
}
}
this._type,
this.audience
);
// Acquire the lock to negotiate the CBS claim.
log.link(
"[%s] Acquiring cbs lock: '%s' for cbs auth for %s: '%s' with address '%s'.",
this._context.namespace.connectionId,
this._context.namespace.negotiateClaimLock,
this._type,
this.name,
this.address
);
if (!tokenObject) {
throw new Error("Token cannot be null");
}
await defaultLock.acquire(this._context.namespace.negotiateClaimLock, () => {
return this._context.namespace.cbsSession.negotiateClaim(this.audience, tokenObject, tokenType);
});
log.link(
"[%s] Negotiated claim for %s '%s' with with address: %s",
this._context.namespace.connectionId,
this._type,
this.name,
this.address
);
if (setTokenRenewal) {
await this._ensureTokenRenewal();
}
}
actionAfterTimeout,
getRetryAttemptTimeoutInMs(options.retryOptions)
);
if (!this.isOpen()) {
logger.verbose(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
try {
const senderOptions = this._createSenderOptions(
getRetryAttemptTimeoutInMs(options.retryOptions)
);
await defaultLock.acquire(this.senderLock, () => {
return this._init(senderOptions);
});
} catch (err) {
removeListeners();
err = translate(err);
logger.warning(
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name,
err
);
logErrorStackTrace(err);
return reject(err);
}
}