Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
to(topic: string, iO: Observable): Subscription {
let client = this.getClient();
let producer = new kafka.Producer(client);
let buffer = [];
let handleErr = err => {
if (err) { throw err; }
};
producer.on('ready', () => {
if (!buffer.length) { return; }
producer.send([{
topic: topic,
messages: buffer
}], handleErr);
});
return iO
// Things run faster overall with a little buffering
kit('listens to a kafka topic', (done) => {
let recieved = 0;
let topic = 'gustavTest-listen';
let producer = new kafka.Producer(client);
producer.on('ready', () => {
producer.send([{
topic,
messages: ['hello']
}], handleErr);
});
producer.on('err', handleErr);
let kafObservable = gr.from(topic);
kafObservable.subscribe(item => {
recieved++;
expect(item, 'Recieved proper message').to.equal('hello');
expect(recieved, 'Correct number of runs').to.equal(1);
_initKafka() {
let kafkaClientOptions = {
kafkaHost: this.appConfig.KAFKA_HOST
};
let kafkaProducerOptions = {
partitionerType: 2
};
let kafkaProducer = new kafka.Producer(new kafka.KafkaClient(kafkaClientOptions), kafkaProducerOptions);
kafkaProducer.on('ready', () => {
this.log.info(`Kafka => Connected to: ${kafkaClientOptions.kafkaHost}`);
this.start();
if (this.runContinuously && !this.cli.flags.checkOnly) {
setInterval(() => {
this.start();
}, this.appConfig.KOHERA_CHECK_CONSISTENCY_INTERVAL * 1000);
}
});
kafkaProducer.on('error', error => {
this.log.error(`Kafka => ${error.message}`);
this.kafkaUtils.checkErrorRate(error);
if (error.message.search('ECONNREFUSED') !== -1) {
process.exit(1);
}
constructor() {
try {
const client = new kafka.KafkaClient({
kafkaHost: getKafkaHost(),
});
this.serviceUp = true;
this.producer = new kafka.Producer(client);
this.producer.on("ready", () => console.log("Kafka producer ready"));
this.producer.on("error", err =>
console.error("Kafka producer error", err)
);
} catch (e) {
this.serviceUp = false;
}
}
exports.register = function (topic, cb) {
var producer = new kafka.Producer(kafkaClient);
console.log('registering topic ' + topic);
producer.createTopics([topic], false, cb);
};
init(diContainer) {
if (this.appConfig.LITE === false) {
this.kafkaHost = this.appConfig.KAFKA_HOST;
this.kafkaClientOptions = {
kafkaHost: this.kafkaHost
};
this.kafkaProducerOptions = {
partitionerType: 2
};
this.kafkaProducer = new kafka.Producer(new kafka.KafkaClient(this.kafkaClientOptions), this.kafkaProducerOptions);
this.kafkaProducer.on('error', error => {
this.log.error(`Kafka => ${error.message}`);
this.kafkaUtils.checkErrorRate(error);
if (error.message.search('ECONNREFUSED') !== -1) {
process.exit(1);
}
});
this.kafkaProducer.on('ready', () => {
this.log.info(`Kafka => Connected to: ${this.kafkaHost}`);
diContainer.kafkaProducer = this.kafkaProducer;
this.controllers = controllers(diContainer);
this.initPrimus();
});
} else {
var shortid = require('shortid');
var kafka = require('kafka-node');
var redisClient = require('redis').createClient();
const {
TOPIC,
PRODUCER_CONFIG,
KAFKA_HOST,
PUBSUB_TOPIC,
API_PORT,
API_CON_TIMEOUT
} = require('./config');
const { getPartition, throwIf, isValidEvent } = require('./utils');
var app = express();
const client = new kafka.KafkaClient({ kafkaHost: KAFKA_HOST });
const producer = new kafka.Producer(client, PRODUCER_CONFIG, getPartition);
const admin = new kafka.Admin(client);
const produceMsg = Bluebird.promisify(producer.send.bind(producer));
const offetGet = Bluebird.promisify(admin.describeGroups.bind(admin));
app.use(bodyParser.json());
const map = {};
function startListener(deps) {
deps.redis.psubscribe(PUBSUB_TOPIC + ':*');
deps.redis.on('pmessage', function(pattern, channel, message) {
const id = channel.split(':')[1];
if (deps.map[id]) {
deps.map[id].resolve(JSON.parse(message));
delete deps.map[id];
private static getProducer(config: IConfig): Promise {
const client = this.getKafkaClient(config);
const producer = new Producer(client);
const producerReadyPromise = new Promise((resolve, reject) => {
producer.on('ready', () => resolve(producer));
producer.on('error', () => reject('Producer got error in connection'));
});
return timeout(producerReadyPromise, this._kafkaConnectionTimeoutInMs);
}
async getProducer(config) {
const client = await this.getKafkaClient(config);
const producer = new kafka_node_1.Producer(client);
return producer;
}
async getConsumer(topicId, config) {
return new Promise((resolve, reject) => {
const producer = new Producer(this.client);
Logger.trace(`Waiting for kafka publisher client connection`);
producer.on('error', async (err: any) => {
Logger.error(`Error on publishing kafka message ${new Json().stringify(err)}`);
producer.close();
this.client.close();
reject(err);
});
Logger.trace(`Kafka publisher is ready`);
producer.send(this.kafkaPayload, async (err, data) => {
if (err) {
Logger.error(`Error sending kafka message ${new Json().stringify(err)}`);
reject(err);
} else {
producer.close();
this.onSend(data, resolve);