Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
console.log("msg group (batch size) - %d", msgGroup);
console.log("msg size - %d", msgSize);
console.log("iterations - %s", iterationValue);
console.log("wait time between iterations - %d", wait);
let clients: EventHubClient[] = [];
for (let c = 0; c < clientPool; c++) {
if (partitionId != undefined) {
const partitionIdRange = partitionId.split("-").map((x) => Number.parseInt(x));
partitionIds = Array.from(
new Array(partitionIdRange[partitionIdRange.length - 1] - partitionIdRange[0] + 1),
(val, index) => index + partitionIdRange[0]);
log("[Client-%d] Sending messages to partitionId: ", c, partitionIds);
} else {
log("[Client-%d] Sending messages in a round robin fashion to all the partitions.", c);
}
clients.push(EventHubClient.createFromConnectionString(connectionString, argv.hub));
}
const msgBody = Buffer.from("Z".repeat(msgSize));
const obj: EventData = { body: msgBody };
let datas: EventData[] = [];
let count = 0;
if (msgGroup > 1) { // send batch
for (count = 0; count < msgGroup; count++) {
datas.push(obj);
}
}
const msgToSend: EventData | EventData[] = datas.length ? datas : obj;
const clientSendMessage = async (client: EventHubClient, index: number, partitionId?: string | number) => {
try {
for (let i = 0; i < iterationValue; i++) {
const startTime = Date.now();
async function main(): Promise {
const client = EventHubClient.createFromConnectionString(connectionString, eventHubName);
const partitionIds = await client.getPartitionIds();
const messageCount = 300;
const events: EventData[] = [];
// NOTE: For receiving events from Azure Stream Analytics, please send Events to an EventHub
// where the body is a JSON object/array.
// const events = [
// { body: { "message": "Hello World 1" }, applicationProperties: { id: "Some id" }, partitionKey: "pk786" },
// { body: { "message": "Hello World 2" } },
// { body: { "message": "Hello World 3" } }
// ];
for (let i = 0; i < messageCount; i++) {
events.push({ body: `Hello foo ${i}` });
}
console.log("Sending batch events...");
const consumerGroup: string = argv.consumer;
const maxWaitTime: number = argv.maxwait;
console.log("consumer group - %s", consumerGroup);
console.log("partitionId - %s", partitionId);
console.log("max wait time (seconds) - %d", maxWaitTime);
let client1: EventHubClient;
let client2: EventHubClient;
let connectionString = argv.connStr;
if (!connectionString) {
let address = argv.address;
if (!address.endsWith("/")) address += "/";
if (!address.startsWith("sb://")) address = "sb://" + address;
connectionString = `Endpoint=${address};SharedAccessKeyName=${argv.keyName};SharedAccessKey=${argv.key}`;
}
client1 = EventHubClient.createFromConnectionString(connectionString, argv.hub);
client2 = EventHubClient.createFromConnectionString(connectionString, argv.hub);
log(`Created Receiver: for partition: "${partitionId}" in consumer group: "${consumerGroup}" in event hub "${argv.hub}".`);
log(`Created Sender: for partition: "${partitionId}" in event hub "${argv.hub}".`);
const onMessage = (m: EventData) => {
const mid = m.properties!.message_id as string;
const num = m.sequenceNumber;
if (mid && !cache[mid]) {
const msg = `Error message with seq num ${num} and id '${mid}' not found in cache..`;
log(">>>> %o", new Error(msg));
} else {
log("Received message with seq num %d and id '%s' and it is present in cache.", num, mid);
delete cache[mid];
}
};
const onError = (err: any) => {
log("^^^^^^^^^^ An error occured with the receiver: %o", err);
};
const partitionId: string = argv.partitionId;
const consumerGroup: string = argv.consumer;
const maxWaitTime: number = argv.maxwait;
console.log("consumer group - %s", consumerGroup);
console.log("partitionId - %s", partitionId);
console.log("max wait time (seconds) - %d", maxWaitTime);
let client1: EventHubClient;
let client2: EventHubClient;
let connectionString = argv.connStr;
if (!connectionString) {
let address = argv.address;
if (!address.endsWith("/")) address += "/";
if (!address.startsWith("sb://")) address = "sb://" + address;
connectionString = `Endpoint=${address};SharedAccessKeyName=${argv.keyName};SharedAccessKey=${argv.key}`;
}
client1 = EventHubClient.createFromConnectionString(connectionString, argv.hub);
client2 = EventHubClient.createFromConnectionString(connectionString, argv.hub);
log(`Created Receiver: for partition: "${partitionId}" in consumer group: "${consumerGroup}" in event hub "${argv.hub}".`);
log(`Created Sender: for partition: "${partitionId}" in event hub "${argv.hub}".`);
const onMessage = (m: EventData) => {
const mid = m.properties!.message_id as string;
const num = m.sequenceNumber;
if (mid && !cache[mid]) {
const msg = `Error message with seq num ${num} and id '${mid}' not found in cache..`;
log(">>>> %o", new Error(msg));
} else {
log("Received message with seq num %d and id '%s' and it is present in cache.", num, mid);
delete cache[mid];
}
};
const onError = (err: any) => {
log("^^^^^^^^^^ An error occured with the receiver: %o", err);
const storageStr: string = argv.storageStr;
const hub: string = argv.hub;
const ephCount: number = argv.hosts;
const leaseContainerName = argv.leaseContainerName;
const hostPrefix = argv.hostPrefix;
const leaseDuration = argv.leaseDuration;
const leaseRenewInterval = argv.leaseRenewInterval;
if (!connectionString) {
let address = argv.address;
if (!address.endsWith("/")) address += "/";
if (!address.startsWith("sb://")) address = "sb://" + address;
connectionString = `Endpoint=${address};SharedAccessKeyName=${argv.keyName};SharedAccessKey=${argv.key}`;
}
const client = EventHubClient.createFromConnectionString(connectionString, hub);
const ids = await client.getPartitionIds();
await client.close();
log("Total number of partitions: %d", ids.length);
log("Creating %d EPH(s).", ephCount);
for (let i = 0; i < ephCount; i++) {
const hostName = `${hostPrefix}-${i + 1}`;
ephCache[hostName] = EventProcessorHost.createFromConnectionString(hostName,
storageStr,
leaseContainerName,
connectionString,
{
leaseDuration: leaseDuration,
leaseRenewInterval: leaseRenewInterval,
eventHubPath: hub,
consumerGroup: consumerGroup,
initialOffset: offset === "-1" ? EventPosition.fromOffset("-1") : undefined,
try {
validateArgs(argv);
let partitionIds = argv.partitions;
const consumerGroup = argv.consumer;
const offset = argv.offset;
const duration = argv.duration;
let client: EventHubClient;
let connectionString = argv.connStr;
if (!connectionString) {
let address = argv.address;
if (!address.endsWith("/")) address += "/";
if (!address.startsWith("sb://")) address = "sb://" + address;
connectionString = `Endpoint=${address};SharedAccessKeyName=${argv.keyName};SharedAccessKey=${argv.key}`;
}
client = EventHubClient.createFromConnectionString(connectionString, argv.hub);
if (!partitionIds) {
partitionIds = await client.getPartitionIds();
}
log("PartitionIds in the eventhub '%s' are: ", argv.hub, partitionIds);
startTime = Date.now();
log("Start time for receiving messages is: %s", startTime);
if (duration) {
log(">>>>>>>>>>>> Performance benchmark mode. <<<<<<<<<<<<<<<<");
log("Will be receiving messages only from partition: '0'.");
log(`Created Receiver for partition: "0" in consumer group: "${consumerGroup}" in event hub "${argv.hub}".`);
let datas = await client.receiveBatch("0", 500000, duration, { consumerGroup: consumerGroup, eventPosition: EventPosition.fromOffset(offset, true) });
log(`Received ${datas.length} messages in ${duration} seconds @ ${Math.floor(datas.length / duration)} messages/second.`);
} else {
for (let id of partitionIds) {
log(`Created Receiver: for partition: "${id}" in consumer group: "${consumerGroup}" in event hub "${argv.hub}".`);
const initialTS = Date.now();
async function main(): Promise {
const client = EventHubClient.createFromConnectionString(connectionString, eventHubsName);
const info = await client.getHubRuntimeInformation();
console.log("RuntimeInfo: ", info);
const pInfo = await client.getPartitionInformation(info.partitionIds[0]);
console.log("Partition Information: ", pInfo);
await client.close();
}
ctxt.getEventHubClient = () => {
if (ctxt.tokenProvider) {
return EventHubClient.createFromTokenProvider(
ctxt.connectionConfig.host,
ctxt.eventHubPath,
ctxt.tokenProvider,
{ userAgent: ctxt.userAgent }
);
} else {
return EventHubClient.createFromConnectionString(ctxt.eventHubConnectionString, ctxt.eventHubPath, {
userAgent: ctxt.userAgent
});
}
};
ctxt.getHubRuntimeInformation = async () => {