Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const ids = await client.getPartitionIds();
await client.close();
log("Total number of partitions: %d", ids.length);
log("Creating %d EPH(s).", ephCount);
for (let i = 0; i < ephCount; i++) {
const hostName = `${hostPrefix}-${i + 1}`;
ephCache[hostName] = EventProcessorHost.createFromConnectionString(hostName,
storageStr,
leaseContainerName,
connectionString,
{
leaseDuration: leaseDuration,
leaseRenewInterval: leaseRenewInterval,
eventHubPath: hub,
consumerGroup: consumerGroup,
initialOffset: offset === "-1" ? EventPosition.fromOffset("-1") : undefined,
onEphError: (error) => {
log(">>>>>>> [%s] Error: %O", hostName, error);
}
});
}
const startedEphs: Array> = [];
for (let ephName of Object.keys(ephCache)) {
// Message handler
const eph = ephCache[ephName];
let count: number = 0;
const onMessage: OnReceivedMessage = async (context: PartitionContext, data: EventData) => {
count++;
// log("##### [%s] %d - Rx message from partition '%s'.", eph.hostName, count, context.partitionId);
// Checkpointing every 1000th event
if (count % 1000 === 0) {
try {
if (partitionCount[id].timer == undefined) {
partitionCount[id].prevTimestamp = ts;
partitionCount[id].timer = setInterval(messageRate, logFrequency);
}
if (argv.fullEventData) {
log("Corresponding EventData object: %o", m);
}
};
const onError = (err: any) => {
if (partitionCount[id].timer != undefined) {
clearInterval(partitionCount[id].timer as NodeJS.Timer);
partitionCount[id].timer = undefined;
}
log("^^^^^^^^^^ An error occured with the receiver: %o", err);
};
client.receive(id, onMessage, onError, { consumerGroup: consumerGroup, eventPosition: EventPosition.fromOffset(offset, true) });
}
}
log("Started receiving messages from offset: '%s'.", offset);
} catch (err) {
return Promise.reject(err);
}
}
);
const withHostAndPartiton = this._context.withHostAndPartition;
let result: EventPosition;
if (!startingCheckpoint) {
if (this._context.initialOffset) {
log.partitionContext(
withHostAndPartiton(this, "User provided initial offset: %s"),
this._context.initialOffset.getExpression()
);
}
result = this._context.initialOffset || EventPosition.fromOffset(this._offset);
} else {
if (startingCheckpoint.offset != undefined) this._offset = startingCheckpoint.offset;
if (startingCheckpoint.sequenceNumber != undefined)
this._sequenceNumber = startingCheckpoint.sequenceNumber;
result = EventPosition.fromOffset(this._offset);
log.partitionContext(
withHostAndPartiton(this, "Retrieved starting offset/sequence " + "number: %s/%d"),
this._offset,
this._sequenceNumber
);
}
log.partitionContext(
withHostAndPartiton(
this,
"Initial position provider offset: %s, " + "sequenceNumber: %d, enqueuedTime: %d"
),
result.offset,
result.sequenceNumber,
result.enqueuedTime
);
return result;
async getInitialOffset(): Promise {
const startingCheckpoint = await this._context.checkpointManager.getCheckpoint(
this.partitionId
);
const withHostAndPartiton = this._context.withHostAndPartition;
let result: EventPosition;
if (!startingCheckpoint) {
if (this._context.initialOffset) {
log.partitionContext(
withHostAndPartiton(this, "User provided initial offset: %s"),
this._context.initialOffset.getExpression()
);
}
result = this._context.initialOffset || EventPosition.fromOffset(this._offset);
} else {
if (startingCheckpoint.offset != undefined) this._offset = startingCheckpoint.offset;
if (startingCheckpoint.sequenceNumber != undefined)
this._sequenceNumber = startingCheckpoint.sequenceNumber;
result = EventPosition.fromOffset(this._offset);
log.partitionContext(
withHostAndPartiton(this, "Retrieved starting offset/sequence " + "number: %s/%d"),
this._offset,
this._sequenceNumber
);
}
log.partitionContext(
withHostAndPartiton(
this,
"Initial position provider offset: %s, " + "sequenceNumber: %d, enqueuedTime: %d"
),