Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
console.log(
`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroupName}'`
);
}
// checkpoint using the last event in the batch
const lastEvent = events[events.length - 1];
await context.updateCheckpoint(lastEvent).catch((err) => {
console.log(`Error when checkpointing on partition ${context.partitionId}: `, err);
});
console.log(
`Successfully checkpointed event with sequence number: ${lastEvent.sequenceNumber} from partition: 'partitionContext.partitionId'`
);
}
const subscription = consumerClient.subscribe(EventHubConsumerClient.defaultConsumerGroupName, processEvents, new BlobPartitionManager(containerClient))
// after 30 seconds, stop processing
await new Promise(resolve => {
setInterval(async () => {
await subscription.close();
await consumerClient.close();
resolve();
}, 30000)
});
}export async function main() {
console.log(`Running receiveEventsUsingCheckpointStore sample`);
// this client will be used by our eventhubs-checkpointstore-blob, which
// persists any checkpoints from this session in Azure Storage
const containerClient = new ContainerClient(storageConnectionString, containerName);
if (!(await containerClient.exists())) {
await containerClient.create();
}
const checkpointStore : CheckpointStore = new BlobCheckpointStore(containerClient);
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName, checkpointStore);
const subscription = consumerClient.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Received event: '${event.body}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`);
}
try {
// save a checkpoint for the last event now that we've processed this batch.
await context.updateCheckpoint(events[events.length - 1]);
} catch (err) {
console.log(`Error when checkpointing on partition ${context.partitionId}: `, err);
throw err;
};