Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const { Kafka, logLevel } = require('kafkajs') // eslint-disable-line @typescript-eslint/no-var-requires
type JestFn = (_: any) => void
const kafka = new Kafka({
brokers: ['localhost:9092'],
clientId: 'dockest_example',
logLevel: logLevel.NOTHING,
retry: {
initialRetryTime: 2500,
retries: 10,
},
})
const createConsumer = (
mockConsumptionCallback: JestFn,
): { consumer: any; startConsuming: () => Promise; stopConsuming: () => Promise } => {
const consumer = kafka.consumer({ groupId: 'dockest_group_1' })
const startConsuming = async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'dockesttopic' })
await consumer.run({
eachMessage: async ({
this.serviceLogger = kafkalogLevel => ({ namespace, level, label, log }) => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return this.logger.error("KAFKAJS: " + namespace + log.message, log);
case logLevel.WARN:
return this.logger.warn("KAFKAJS: " + namespace + log.message, log);
case logLevel.INFO:
return this.logger.info("KAFKAJS: " + namespace + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("KAFKAJS: " + namespace + log.message, log);
}
}
this.serviceLogger = () => ({ level, log }) => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return this.logger.error("namespace:" + log.message, log);
case logLevel.WARN:
return this.logger.warn("namespace:" + log.message, log);
case logLevel.INFO:
return this.logger.info("namespace:" + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("namespace:" + log.message, log);
}
};
this.serviceLogger = () => ({ level, log }) => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return this.logger.error("namespace:" + log.message, log);
case logLevel.WARN:
return this.logger.warn("namespace:" + log.message, log);
case logLevel.INFO:
return this.logger.info("namespace:" + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("namespace:" + log.message, log);
}
};
this.serviceLogger = () => ({ namespace, level, log }) => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return this.logger.error("KAFKAJS: " + namespace + log.message, log);
case logLevel.WARN:
return this.logger.warn("KAFKAJS: " + namespace + log.message, log);
case logLevel.INFO:
return this.logger.info("KAFKAJS: " + namespace + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("KAFKAJS: " + namespace + log.message, log);
}
};