Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
getKafkaConsumer = function(cohortName) {
console.log('Invoked getKC');
// Close any previous Kafka resources
if (consumer != null) consumer.close();
if (client != null) client.close();
// Get new Kafka resources
kafka_endpoint = process.env.KAFKA_ENDPOINT;
console.log('Will connect to kafka using '+kafka_endpoint);
client = new kafka.KafkaClient({ kafkaHost : kafka_endpoint });
console.log('Client connected to kafka using '+kafka_endpoint);
admin = new kafka.Admin(client);
console.log('Kafka Admin created - try to list topics...');
admin.listTopics((err, res) => { console.log('topics', res); });
console.log('Try to create Kafka Consumer...');
consumer = new kafka.Consumer( client, [ { topic: 'open-metadata.repository-services.cohort.'+cohortName+'.OMRSTopic', partition: 0 } ], { autoCommit: true });
consumer.on('error', function() { io.emit('cohort', 'does not exist - please try a different cohort name')});
consumer.on('message', function(message) {
try {
var data = JSON.parse(message.value);
io.emit('event', data); console.log(data);
}
catch (e) {
if (e instanceof SyntaxError) {
console.log('Could not make sense of JSON - skipping this event');
} else {
console.log('Encountered (non-syntax) error in JSON.parse - skipping this event');
}
return new Promise((resolve, reject) => {
// Declares a new instance of client that will be used to make a connection
const client = new kafka.KafkaClient({ kafkaHost: kafkaHostURI });
// Declaring a new kafka.Admin instance creates a connection to the Kafka admin API
const admin = new kafka.Admin(client);
// Fetch all topics from the Kafka broker
admin.listTopics((err, data) => {
if (err) return reject(new Error(`getting list of Topics:${err}`));
// Reassign topics with only the object containing the topic data
logger.log('Result of admin.listTopics API call:', data);
const topicsMetadata = data[1].metadata;
logger.log('topicsMetadata obtained:', topicsMetadata);
const topics = Object.entries(topicsMetadata)
.filter(([topicName]) => !topicNamesToIgnore.includes(topicName))
.map(([topicName, topicPartitions]) => ({
numberOfPartitions: Object.keys(topicPartitions).length,
topicName,
return new Promise((resolve, reject) => {
// Declares a new instance of client that will be used to make a connection
const client = new kafka.KafkaClient({ kafkaHost: kafkaHostURI });
// Declaring a new kafka.Admin instance creates a connection to the Kafka admin API
const admin = new kafka.Admin(client);
const brokerResult = {};
// Fetch all topics from the Kafka broker
admin.listTopics((err, data) => {
if (err) {
logger.error(err);
client.close();
return reject({ error: err });
}
// Reassign topics with only the object containing the topic data
// logger.log('brokerMetadata IN BROKER API:', data[0]);
const brokerMetadata = data[0];
const topicsMetadata = data[1].metadata;
logger.log('Object Entries of brokerMetadata', Object.entries(brokerMetadata));
var kafka = require('kafka-node');
var redisClient = require('redis').createClient();
const {
TOPIC,
PRODUCER_CONFIG,
KAFKA_HOST,
PUBSUB_TOPIC,
API_PORT,
API_CON_TIMEOUT
} = require('./config');
const { getPartition, throwIf, isValidEvent } = require('./utils');
var app = express();
const client = new kafka.KafkaClient({ kafkaHost: KAFKA_HOST });
const producer = new kafka.Producer(client, PRODUCER_CONFIG, getPartition);
const admin = new kafka.Admin(client);
const produceMsg = Bluebird.promisify(producer.send.bind(producer));
const offetGet = Bluebird.promisify(admin.describeGroups.bind(admin));
app.use(bodyParser.json());
const map = {};
function startListener(deps) {
deps.redis.psubscribe(PUBSUB_TOPIC + ':*');
deps.redis.on('pmessage', function(pattern, channel, message) {
const id = channel.split(':')[1];
if (deps.map[id]) {
deps.map[id].resolve(JSON.parse(message));
delete deps.map[id];
}