Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
function consume(subject, channel, reply) {
const readerKey = `${subject}.${channel}`
// only one reader per topic channel combination
if (readers.has(readerKey)) {
return reply()
}
// if not exist, create Reader instance
const reader = new Nsq.Reader(subject, channel, opts.nsqReader)
reader.connect()
reader.on(Nsq.Reader.NSQD_CONNECTED, (host, port) => {
hemera.log.info('NSQ Reader connected to %s:%s', host, port)
reply()
})
reader.on(Nsq.Reader.DISCARD, msg => {
hemera.log.warn(msg, 'NSQ Message was discarded')
})
reader.on(Nsq.Reader.ERROR, err => {
hemera.log.error(err, 'NSQ Reader error')
reply(err)
// Let it crash and restart