Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
// Temporarily do singleton list messages until logic from the http logger is extracted
this.encoder = {encode: span => `[${JSON_V2.encode(span)}]`};
} else {
throw new Error('Unsupported encoder. Valid choices are THRIFT and JSON_V2.');
}
this.topic = options.topic || 'zipkin';
if (clientOpts.connectionString) {
this.client = new kafka.Client(
clientOpts.connectionString, clientOpts.clientId, clientOpts.zkOpts
);
} else {
this.client = new kafka.KafkaClient(clientOpts);
}
this.producer = new kafka.HighLevelProducer(this.client, producerOpts);
this.producerState = 'pending';
this.producer.on('ready', () => {
this.producerState = 'ready';
this.producer.removeAllListeners('ready');
});
this.producer.on('error', this.onError);
this.client.on('error', this.onError);
}
function init() {
client = new kafka.KafkaClient({
kafkaHost: envHelper.sunbird_kafka_host,
maxAsyncRequests: 100
});
producer = new kafka.HighLevelProducer(client);
producer.on('ready', function () {
console.log('Kafka Producer is connected and ready.');
});
producer.on('error', function (error) {
console.error("Errored at kafka", error)
});
}
const kafkaOptions = {
kafkaHost: this.conString,
ssl: !!_options.sslOptions,
sslOptions: _options.sslOptions,
connectTimeout: 1000,
requestTimeout: 30000,
autoConnect: _options.autoConnect || true,
connectRetryOptions: DEFAULT_RETRY_OPTIONS
};
this.client = new KafkaClient(kafkaOptions);
} else {
this.client = new Client(this.conString, clientId, {}, _options.sslOptions || {});
}
this.producer = new HighLevelProducer(this.client, _options);
this.isProducer = true;
this._getLogger().info("starting Producer.");
this.targetTopics = targetTopics;
this._attachProducerListeners();
}
function(callback) {
self.kafkaProducer = new kafka.HighLevelProducer(self.connectionClient);
self.kafkaProducer.on('error', function() {});
callback();
}
], function(err) {
var KafkaClient = function(name, config){
MessagingClient.call(this, name, config);
this.connectionClient = kafka.Client(config.host+':'+config.port+'/', name);
this.kafkaConsumer = new kafka.HighLevelConsumer(this.connectionClient, [{topic: config.topic}], {groupId: config.topic});
this.kafkaConsumer.on('error', function() {});
this.kafkaProducer = new kafka.HighLevelProducer(this.connectionClient);
this.kafkaProducer.on('error', function() {});
};
this.connecting = new Promise((resolve, reject) => {
if (this.connected) {
resolve(true);
} else {
this.client = new kafka.KafkaClient({kafkaHost: this.host});
this.producer = new kafka.HighLevelProducer(this.client);
this.producer.setMaxListeners(0);
this.producer
.on('ready', () => {
this.connected = true;
resolve(true);
});
this.producer
.on('error', (error) => {
this.logger.log({
level: 'error',
message: 'Kafka connector error: ' + error
});
});