Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
test('invalidAccessKeyGivesError', function (done) {
azure.configure('testInvalidAccessKeyGivesError', function (c) {
c.serviceBus({
namespace: process.env['AZURE_SERVICEBUS_NAMESPACE'],
key: 'key'
});
});
var serviceBusService = azureSb.createServiceBusService(azure.config('testInvalidAccessKeyGivesError'));
suiteUtil.setupService(serviceBusService);
// fails, with an error on the callback.
serviceBusService.createTopicIfNotExists('Topic', function(error) {
assert.notEqual(error, null);
assert.equal(error.code, '401');
done();
});
});
throw error;
}
const insights = app.settings.appInsightsClient;
if (!insights) {
throw new Error('No app insights client available');
}
const firehoseConfig = config.webJob.firehose;
if (!firehoseConfig) {
throw new Error('No firehose configuration');
}
const serviceBusConfig = firehoseConfig.serviceBus;
if (!serviceBusConfig || !serviceBusConfig.connectionString || !serviceBusConfig.queue) {
throw new Error('No service bus queue configuration for the firehose webjob');
}
// NOTE: this architecture moved from topics to queues in 2019. It is still using the very old library.
const serviceBusService = serviceBus.createServiceBusService(serviceBusConfig.connectionString);
// let parallelism = messagesInQueue > maxParallelism / 2 ? maxParallelism : Math.min(5, maxParallelism);
let parallelism = maxParallelism;
console.log(`Parallelism for this run will be ${parallelism} logical threads`);
// const insights = app.settings.appInsightsClient;
insights.trackEvent({
name: 'JobFirehoseStarted',
properties: {
hostname: os.hostname(),
queue: serviceBusConfig.queue,
subscription: serviceBusConfig.subscriptionName,
// messagesInQueue: messagesInQueue.toString(),
//deadLetters: deadLetters.toString(),
},
});
//insights.trackMetric({ name: 'FirehoseMessagesInQueue', value: messagesInQueue });
//insights.trackMetric({ name: 'FirehoseDeadLetters', value: deadLetters });
test('connectionStringsWithSbSchema', function (done) {
var key = 'AhlzsbLRkjfwObuqff3xrhB2yWJNh1EMptmcmxFJ6fvPTVX3PZXwrG2YtYWf5DPMVgNsteKStM5iBLlknYFVoA==';
var connectionString = 'Endpoint=sb://ablal-martvue.servicebus.windows.net/;StsEndpoint=https://ablal-martvue-sb.accesscontrol.windows.net;SharedSecretIssuer=owner;SharedSecretValue=' + key;
var serviceBusService = azureSb.createServiceBusService(connectionString);
suiteUtil.setupService(serviceBusService);
assert.equal(serviceBusService.host, 'ablal-martvue.servicebus.windows.net');
assert.equal(serviceBusService.protocol, 'https:');
assert.equal(serviceBusService.authenticationProvider.issuer, 'owner');
assert.equal(serviceBusService.authenticationProvider.accessKey, key);
assert.equal(serviceBusService.authenticationProvider.acsHost, 'https://ablal-martvue-sb.accesscontrol.windows.net');
done();
});
});
test('connectionStrings', function (done) {
var key = 'AhlzsbLRkjfwObuqff3xrhB2yWJNh1EMptmcmxFJ6fvPTVX3PZXwrG2YtYWf5DPMVgNsteKStM5iBLlknYFVoA==';
var connectionString = 'Endpoint=http://ablal-martvue.servicebus.windows.net/;StsEndpoint=https://ablal-martvue-sb.accesscontrol.windows.net;SharedSecretIssuer=owner;SharedSecretValue=' + key;
var serviceBusService = azureSb.createServiceBusService(connectionString);
suiteUtil.setupService(serviceBusService);
assert.equal(serviceBusService.host, 'ablal-martvue.servicebus.windows.net');
assert.equal(serviceBusService.authenticationProvider.issuer, 'owner');
assert.equal(serviceBusService.authenticationProvider.accessKey, key);
assert.equal(serviceBusService.authenticationProvider.acsHost, 'https://ablal-martvue-sb.accesscontrol.windows.net');
done();
});
test('storageConnectionStringsEndpointHttpsExplicit', function (done) {
var topicName = testutil.generateId(topicNamesPrefix, topicNames, suiteUtil.isMocked);
var expectedNamespace = process.env[ServiceClientConstants.EnvironmentVariables.AZURE_SERVICEBUS_NAMESPACE];
var expectedKey = process.env[ServiceClientConstants.EnvironmentVariables.AZURE_SERVICEBUS_ACCESS_KEY];
var expectedHost = 'https://' + process.env[ServiceClientConstants.EnvironmentVariables.AZURE_SERVICEBUS_NAMESPACE] + '.servicebus.windows.net';
var serviceBusService = azureSb.createServiceBusService(expectedNamespace, expectedKey, undefined, undefined, expectedHost);
suiteUtil.setupService(serviceBusService);
serviceBusService.createTopic(topicName, function (err) {
assert.equal(err, null);
assert.equal(serviceBusService.host, process.env[ServiceClientConstants.EnvironmentVariables.AZURE_SERVICEBUS_NAMESPACE] + '.servicebus.windows.net');
assert.equal(serviceBusService.port, 443);
assert.equal(serviceBusService.authenticationProvider.issuer, 'owner');
assert.equal(serviceBusService.authenticationProvider.accessKey, expectedKey);
assert.equal(serviceBusService.authenticationProvider.acsHost, 'https://' + process.env[ServiceClientConstants.EnvironmentVariables.AZURE_SERVICEBUS_NAMESPACE] + '-sb.accesscontrol.windows.net:443');
done();
});
});
before(function (done) {
sandbox = sinon.sandbox.create();
service = azureSb.createServiceBusService()
.withFilter(new azure.ExponentialRetryPolicyFilter());
suiteUtil = notificationhubstestutil.createNotificationHubsTestUtils(service, testPrefix);
suiteUtil.setupSuite(done);
});
suiteSetup(function (done) {
serviceBusService = azureSb.createServiceBusService()
.withFilter(new azure.ExponentialRetryPolicyFilter());
suiteUtil = servicebustestutil.createServiceBusTestUtils(serviceBusService, testPrefix);
suiteUtil.setupSuite(done);
});
var idx = 0
function sendMessages (sbService, queueName) {
var msg = 'Message # ' + (++idx)
sbService.sendQueueMessage(queueName, msg, function (err) {
if (err) {
console.log('Failed Tx: ', err)
} else {
console.log('Sent ' + msg)
}
})
}
const queueName = 'sbqtest'
console.log(`Connecting to queue ${queueName}`)
var sbService = azure.createServiceBusService()
sbService.createQueueIfNotExists(queueName, function (err) {
if (err) {
console.log('Failed to create queue: ', err)
} else {
setInterval(checkForMessages.bind(null, sbService, queueName, processMessage.bind(null, sbService)), 5000)
setInterval(sendMessages.bind(null, sbService, queueName), 15000)
}
})
constructor(amqpUrl, managementEndpoint) {
this.amqpUrl = amqpUrl;
this.managementEndpoint = managementEndpoint;
this.client = null;
const retryOperations = new serviceBus.ExponentialRetryPolicyFilter();
this.serviceBusService = serviceBus.createServiceBusService(managementEndpoint).withFilter(retryOperations);
}
this.publish = async (val) => {
const client = azuresb.createServiceBusService(namespace.defaultPrimaryConnectionString.get());
await new Promise((resolve, reject) => {
client.sendTopicMessage(topic.name.get(), JSON.stringify(val), (err, res) => {
if (err) {
return reject(err);
}
return resolve(res);
});
});
};