Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
protected async _negotiateClaim(setTokenRenewal?: boolean): Promise {
// Acquire the lock and establish a cbs session if it does not exist on the connection.
// Although node.js is single threaded, we need a locking mechanism to ensure that a
// race condition does not happen while creating a shared resource (in this case the
// cbs session, since we want to have exactly 1 cbs session per connection).
log.link(
"[%s] Acquiring cbs lock: '%s' for creating the cbs session while creating the %s: " +
"'%s' with address: '%s'.",
this._context.namespace.connectionId,
this._context.namespace.cbsSession.cbsLock,
this._type,
this.name,
this.address
);
await defaultLock.acquire(this._context.namespace.cbsSession.cbsLock, () => {
return this._context.namespace.cbsSession.init();
});
let tokenObject: AccessToken;
let tokenType: TokenType;
if (this._context.namespace.tokenCredential instanceof SharedKeyCredential) {
tokenObject = this._context.namespace.tokenCredential.getToken(this.audience);
tokenType = TokenType.CbsTokenTypeSas;
// renew sas token in every 45 minutess
this._tokenTimeout = (3600 - 900) * 1000;
} else {
const aadToken = await this._context.namespace.tokenCredential.getToken(Constants.aadServiceBusScope);
if (!aadToken) {
throw new Error(`Failed to get token from the provided "TokenCredential" object`);
}
tokenObject = aadToken;
protected async _negotiateClaim(setTokenRenewal?: boolean): Promise {
// Acquire the lock and establish a cbs session if it does not exist on the connection.
// Although node.js is single threaded, we need a locking mechanism to ensure that a
// race condition does not happen while creating a shared resource (in this case the
// cbs session, since we want to have exactly 1 cbs session per connection).
logger.verbose(
"[%s] Acquiring cbs lock: '%s' for creating the cbs session while creating the %s: " +
"'%s' with address: '%s'.",
this._context.connectionId,
this._context.cbsSession.cbsLock,
this._type,
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, () => {
return this._context.cbsSession.init();
});
let tokenObject: AccessToken;
let tokenType: TokenType;
if (this._context.tokenCredential instanceof SharedKeyCredential) {
tokenObject = this._context.tokenCredential.getToken(this.audience);
tokenType = TokenType.CbsTokenTypeSas;
// renew sas token in every 45 minutess
this._tokenTimeoutInMs = (3600 - 900) * 1000;
} else {
const aadToken = await this._context.tokenCredential.getToken(Constants.aadEventHubsScope);
if (!aadToken) {
throw new Error(`Failed to get token from the provided "TokenCredential" object`);
}
tokenObject = aadToken;
tokenType = TokenType.CbsTokenTypeJwt;
} else {
const state: any = {
wasCloseInitiated: wasCloseInitiated,
senderError: senderError,
_sender: this._sender
};
logger.verbose(
"[%s] Something went wrong. State of sender '%s' with address '%s' is: %O",
this._context.connectionId,
this.name,
this.address,
state
);
}
if (shouldReopen) {
await defaultLock.acquire(this.senderLock, () => {
const options: AwaitableSenderOptions = this._createSenderOptions(
Constants.defaultOperationTimeoutInMs,
true
);
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig = {
operation: () => this._init(options),
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
connectionHost: this._context.config.host,
retryOptions: {
maxRetries: Constants.defaultMaxRetriesForConnection,
retryDelayInMs: 15000
}
};
// create RHEA receiver options
const initOptions = this._createReceiverOptions(receiverOptions);
// attempt to create the link
const linkCreationConfig: RetryConfig = {
connectionId: this._context.connectionId,
connectionHost: this._context.config.host,
operation: () => this.initialize(initOptions),
operationType: RetryOperationType.receiverLink,
retryOptions: {
maxRetries: Constants.defaultMaxRetriesForConnection,
retryDelayInMs: 15000
}
};
await retry(linkCreationConfig);
// if the receiver is in streaming mode we need to add credits again.
if (this._isStreaming) {
this._addCredit(Constants.defaultPrefetchCount);
}
} catch (err) {
logger.verbose(
"[%s] An error occurred while processing onDetached() of Receiver '%s' with address " +
"'%s': %O",
this._context.connectionId,
this.name,
this.address,
err
);
}
}
Constants.defaultOperationTimeoutInMs,
true
);
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig = {
operation: () => this._init(options),
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
connectionHost: this._context.config.host,
retryOptions: {
maxRetries: Constants.defaultMaxRetriesForConnection,
retryDelayInMs: 15000
}
};
return retry(config);
});
}
const linkCreationConfig: RetryConfig = {
connectionId: this._context.connectionId,
connectionHost: this._context.config.host,
operation: () => this.initialize(initOptions),
operationType: RetryOperationType.receiverLink,
retryOptions: {
maxRetries: Constants.defaultMaxRetriesForConnection,
retryDelayInMs: 15000
}
};
await retry(linkCreationConfig);
// if the receiver is in streaming mode we need to add credits again.
if (this._isStreaming) {
this._addCredit(Constants.defaultPrefetchCount);
}
} catch (err) {
logger.verbose(
"[%s] An error occurred while processing onDetached() of Receiver '%s' with address " +
"'%s': %O",
this._context.connectionId,
this.name,
this.address,
err
);
}
}
return this._context.cbsSession.init();
});
let tokenObject: AccessToken;
let tokenType: TokenType;
if (this._context.tokenCredential instanceof SharedKeyCredential) {
tokenObject = this._context.tokenCredential.getToken(this.audience);
tokenType = TokenType.CbsTokenTypeSas;
// renew sas token in every 45 minutess
this._tokenTimeoutInMs = (3600 - 900) * 1000;
} else {
const aadToken = await this._context.tokenCredential.getToken(Constants.aadEventHubsScope);
if (!aadToken) {
throw new Error(`Failed to get token from the provided "TokenCredential" object`);
}
tokenObject = aadToken;
tokenType = TokenType.CbsTokenTypeJwt;
this._tokenTimeoutInMs = tokenObject.expiresOnTimestamp - Date.now() - 2 * 60 * 1000;
}
logger.verbose(
"[%s] %s: calling negotiateClaim for audience '%s'.",
this._context.connectionId,
this._type,
this.audience
);
// Acquire the lock to negotiate the CBS claim.
logger.verbose(
"[%s] Acquiring cbs lock: '%s' for cbs auth for %s: '%s' with address '%s'.",
this._context.connectionId,
this._context.negotiateClaimLock,
this._type,
this.name,
// Define connection string and related Event Hubs entity name here
const connectionString = "";
const eventHubName = "";
// Define AZURE_TENANT_ID, AZURE_CLIENT_ID and AZURE_CLIENT_SECRET of your AAD application in your environment
const ehConnectionConfig = EventHubConnectionConfig.create(connectionString, eventHubName);
const parameters = {
config: ehConnectionConfig,
connectionProperties: {
product: "MSJSClient",
userAgent: "/js-core-amqp",
version: "0.1.0"
}
};
const connectionContext = ConnectionContextBase.create(parameters);
async function authenticate(
audience: string,
closeConnection: boolean = false
): Promise {
await connectionContext.cbsSession.init();
const credential = new DefaultAzureCredential();
const tokenObject = await credential.getToken(Constants.aadEventHubsScope);
if (!tokenObject) {
throw new Error("Aad token cannot be null");
}
const result = await connectionContext.cbsSession.negotiateClaim(
audience,
tokenObject,
TokenType.CbsTokenTypeJwt
);
this._type,
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, () => {
return this._context.cbsSession.init();
});
let tokenObject: AccessToken;
let tokenType: TokenType;
if (this._context.tokenCredential instanceof SharedKeyCredential) {
tokenObject = this._context.tokenCredential.getToken(this.audience);
tokenType = TokenType.CbsTokenTypeSas;
// renew sas token in every 45 minutess
this._tokenTimeoutInMs = (3600 - 900) * 1000;
} else {
const aadToken = await this._context.tokenCredential.getToken(Constants.aadEventHubsScope);
if (!aadToken) {
throw new Error(`Failed to get token from the provided "TokenCredential" object`);
}
tokenObject = aadToken;
tokenType = TokenType.CbsTokenTypeJwt;
this._tokenTimeoutInMs = tokenObject.expiresOnTimestamp - Date.now() - 2 * 60 * 1000;
}
logger.verbose(
"[%s] %s: calling negotiateClaim for audience '%s'.",
this._context.connectionId,
this._type,
this.audience
);
// Acquire the lock to negotiate the CBS claim.
logger.verbose(
await defaultLock.acquire(this.senderLock, () => {
const options: AwaitableSenderOptions = this._createSenderOptions(
Constants.defaultOperationTimeoutInMs,
true
);
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig = {
operation: () => this._init(options),
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
connectionHost: this._context.config.host,
retryOptions: {
maxRetries: Constants.defaultMaxRetriesForConnection,
retryDelayInMs: 15000
}
};
return retry(config);
});
}