Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
console.log("msg group (batch size) - %d", msgGroup);
console.log("msg size - %d", msgSize);
console.log("iterations - %s", iterationValue);
console.log("wait time between iterations - %d", wait);
let clients: EventHubClient[] = [];
for (let c = 0; c < clientPool; c++) {
if (partitionId != undefined) {
const partitionIdRange = partitionId.split("-").map((x) => Number.parseInt(x));
partitionIds = Array.from(
new Array(partitionIdRange[partitionIdRange.length - 1] - partitionIdRange[0] + 1),
(val, index) => index + partitionIdRange[0]);
log("[Client-%d] Sending messages to partitionId: ", c, partitionIds);
} else {
log("[Client-%d] Sending messages in a round robin fashion to all the partitions.", c);
}
clients.push(EventHubClient.createFromConnectionString(connectionString, argv.hub));
}
const msgBody = Buffer.from("Z".repeat(msgSize));
const obj: EventData = { body: msgBody };
let datas: EventData[] = [];
let count = 0;
if (msgGroup > 1) { // send batch
for (count = 0; count < msgGroup; count++) {
datas.push(obj);
}
}
const msgToSend: EventData | EventData[] = datas.length ? datas : obj;
const clientSendMessage = async (client: EventHubClient, index: number, partitionId?: string | number) => {
try {
for (let i = 0; i < iterationValue; i++) {
const startTime = Date.now();
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
const logger = require('../lib').logger;
const config = require('../config');
const EventHubClient = require('@azure/event-hubs').EventHubClient;
const EventPosition = require('@azure/event-hubs').EventPosition;
const ServiceClient = require('azure-iothub').Client;
var serviceConnection = null;
var eventHubConnection = null;
// Called whenever an error occurs in either the message callback or the eventhub connection setup
function errorCb(err) {
logger.crit(err.message);
};
// Called whenever we receive a telemetry message from the client
function messageReceivedCb(message) {
logger.debug('Service successfully received telemetry from client.');
logger.trace(JSON.stringify(message));
var targetDevice = message.body.toString();
if(!targetDevice) {
const ids = await client.getPartitionIds();
await client.close();
log("Total number of partitions: %d", ids.length);
log("Creating %d EPH(s).", ephCount);
for (let i = 0; i < ephCount; i++) {
const hostName = `${hostPrefix}-${i + 1}`;
ephCache[hostName] = EventProcessorHost.createFromConnectionString(hostName,
storageStr,
leaseContainerName,
connectionString,
{
leaseDuration: leaseDuration,
leaseRenewInterval: leaseRenewInterval,
eventHubPath: hub,
consumerGroup: consumerGroup,
initialOffset: offset === "-1" ? EventPosition.fromOffset("-1") : undefined,
onEphError: (error) => {
log(">>>>>>> [%s] Error: %O", hostName, error);
}
});
}
const startedEphs: Array> = [];
for (let ephName of Object.keys(ephCache)) {
// Message handler
const eph = ephCache[ephName];
let count: number = 0;
const onMessage: OnReceivedMessage = async (context: PartitionContext, data: EventData) => {
count++;
// log("##### [%s] %d - Rx message from partition '%s'.", eph.hostName, count, context.partitionId);
// Checkpointing every 1000th event
if (count % 1000 === 0) {
try {
async function doInit() {
const IOT_HUB_CONNECTION_STRING = getEnvironmentVariable('IOT_HUB_CONNECTION_STRING');
registry = Registry.fromConnectionString(IOT_HUB_CONNECTION_STRING);
const client = await EventHubClient.createFromIotHubConnectionString(IOT_HUB_CONNECTION_STRING);
const hubInfo = await client.getHubRuntimeInformation();
console.log(`Connected to IoT Hub at ${hubInfo.path}`);
client.receive(
'1',
(eventData) => { // on 'message
if (eventData.annotations) {
const enqueuedTime = eventData.annotations['x-opt-enqueued-time'];
console.debug(`Received message from IoT Hub, enqueued at ${enqueuedTime}`);
} else {
console.debug(`Received message from IoT Hub`);
}
updateState(eventData.body);
},
(error) => {
console.error(`Error receiving message from Event Hubs: ${error}`);
client.receive(
'1',
(eventData) => { // on 'message
if (eventData.annotations) {
const enqueuedTime = eventData.annotations['x-opt-enqueued-time'];
console.debug(`Received message from IoT Hub, enqueued at ${enqueuedTime}`);
} else {
console.debug(`Received message from IoT Hub`);
}
updateState(eventData.body);
},
(error) => {
console.error(`Error receiving message from Event Hubs: ${error}`);
},
{
eventPosition: EventPosition.fromEnqueuedTime(Date.now())
}
);
}
partitionIds.forEach(async (partitionId: string) => {
const receiveOptions = {
consumerGroup,
enableReceiverRuntimeMetric: true,
eventPosition: EventPosition.fromEnqueuedTime(startTime),
name: `${hubInfo.path}_${partitionId}`,
};
let receiver: ReceiveHandler;
try {
receiver = eventHubClient.receive(
partitionId,
onMessage,
(err: object) => {
console.log(err); // tslint:disable-line: no-console
},
receiveOptions);
receivers.push(receiver);
await delay(SERVER_WAIT).then(() => {
receiver.stop().catch(err => {
console.log(`couldn't stop receiver on partition[${partitionId}]: ${err}`); // tslint:disable-line: no-console
});
const run = async() => {
try {
console.log('Initializing Module Client.');
const moduleClient = ModuleClient.fromConnectionString(process.env.DEVICE_CONNECTION_STRING, Mqtt);
// We use the iot hub connection string to create the client rather than the actual event hub end point
console.log('Initializing Event Hub Client.');
const eventHubClient = await EventHubClient.createFromIotHubConnectionString(process.env.IOTHUB_CONNECTION_STRING);
console.log('Initializing HTTP Device Client.');
const httpClient = HttpClientFromConnectionString(process.env.DEVICE_CONNECTION_STRING);
console.log('Initialized clients!');
console.log('Getting information from Event Hub.');
const partitionIds = await eventHubClient.getPartitionIds(); //read more about partitions https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#partitions
const startingPosition = EventPosition.fromEnqueuedTime(Date.now() - 1000); //subtracting a second to account for delay
const eventLabel = 'beepBoop'; //label our events
const sendDummyMessage = m => moduleClient.sendOutputEvent(eventLabel, m); //send helper
console.log('Creating messages to send to via Module Client');
//generate some messages
const messageCount = 10;
const importantMessages = [...Array(messageCount).keys()].map(generateImportantMessage);
// - The device can add arbitrary application properties to the message
// - IoT Hub adds system properties, such as Device Id, to the message.
var printMessage = function (message) {
console.log('Telemetry received: ');
console.log(JSON.stringify(message.body));
console.log('Application properties (set by device): ')
console.log(JSON.stringify(message.applicationProperties));
console.log('System properties (set by IoT Hub): ')
console.log(JSON.stringify(message.annotations));
console.log('');
};
// Connect to the partitions on the IoT Hub's Event Hubs-compatible endpoint.
// This example only reads messages sent after this application started.
var ehClient;
EventHubClient.createFromIotHubConnectionString(connectionString).then(function (client) {
console.log("Successfully created the EventHub Client from iothub connection string.");
ehClient = client;
return ehClient.getPartitionIds();
}).then(function (ids) {
console.log("The partition ids are: ", ids);
return ids.map(function (id) {
return ehClient.receive(id, printMessage, printError, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
});
}).catch(printError);
const client = new EventHubClient(connectionString, eventHubName);
const partitionIds = await client.getPartitionIds();
const consumer = client.createConsumer("$Default", partitionIds[0], EventPosition.earliest());
const onMessageHandler: OnMessage = (brokeredMessage: EventData) => {
console.log(`Received event: ${brokeredMessage.body}`);
};
const onErrorHandler: OnError = (err: MessagingError | Error) => {
console.log("Error occurred: ", err);
};
try {
const rcvHandler = consumer.receive(onMessageHandler, onErrorHandler);
// Waiting long enough before closing the consumer to receive event
await delay(5000);
await rcvHandler.stop();
} finally {
await client.close();
}
}
console.log(`Received event: ${message.body}`);
}
};
const subscription = client.subscribe(consumerGroupName, onReceivedEventsHandler,
// for simplicity we'll just target a single partition for our demo
partitionIds[0], {
onError: async (err: Error, partitionContext: PartitionContext) => {
console.log(`Error occurred in the subscription for ${partitionContext.partitionId}: ${err}`);
},
// if this subscription happens tob e the first
defaultEventPosition: EventPosition.earliest()
});
// Waiting long enough before closing the consumer to receive event
await delay(5000);
await subscription.close();
await client.close();
}