Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
constructor(conf: any, topicConf: any = {}, options: Options = {}) {
this.dying = false
this.dead = false
this.topics = []
conf['auto.commit.interval.ms'] =
conf['auto.commit.interval.ms'] || DEFAULT_AUTO_COMMIT_INTERVAL
if (!conf['rebalance_cb']) {
conf['rebalance_cb'] = (err: any, assignment: any) => {
if (err.code === ErrorCode.ERR__ASSIGN_PARTITIONS) {
this.consumer.assign(assignment)
let rebalanceLog = 'consumer rebalance : '
for (const assign of assignment) {
rebalanceLog += `{topic ${assign.topic}, partition: ${assign.partition}} `
}
this.logger.info(rebalanceLog)
} else if (err.code === ErrorCode.ERR__REVOKE_PARTITIONS) {
this.consumer.unassign()
} else {
this.logger.error(err)
}
constructor(conf: any, topicConf: any = {}, options: Options = {}) {
this.dying = false
this.dead = false
this.topics = []
conf['auto.commit.interval.ms'] =
conf['auto.commit.interval.ms'] || DEFAULT_AUTO_COMMIT_INTERVAL
if (!conf['rebalance_cb']) {
conf['rebalance_cb'] = (err: any, assignment: any) => {
if (err.code === ErrorCode.ERR__ASSIGN_PARTITIONS) {
this.consumer.assign(assignment)
let rebalanceLog = 'consumer rebalance : '
for (const assign of assignment) {
rebalanceLog += `{topic ${assign.topic}, partition: ${assign.partition}} `
}
this.logger.info(rebalanceLog)
} else if (err.code === ErrorCode.ERR__REVOKE_PARTITIONS) {
this.consumer.unassign()
} else {
this.logger.error(err)
}
}