Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
log('Using HighLevelConsumer');
consumer = new kafka.HighLevelConsumer(
client,
[
{
topic: 'test'
}
],
{
fromOffset: false,
groupId: uuid()
}
);
} else {
log('Using ConsumerGroup');
consumer = new kafka.ConsumerGroup(
{
host: process.env.ZOOKEEPER,
fromOffset: 'latest',
groupId: uuid()
},
['test']
);
}
consumer.on('error', function(err) {
log('Error occured in consumer:', err);
var span = instana.currentSpan();
span.disableAutoEnd();
// simulating asynchronous follow up steps with setTimeout and request-promise
setTimeout(function() {
request('http://127.0.0.1:' + agentPort).finally(function() {
getKafkaConsumer = function(cohortName) {
console.log('Invoked getKC');
// Close any previous Kafka resources
if (consumer != null) consumer.close();
if (client != null) client.close();
// Get new Kafka resources
kafka_endpoint = process.env.KAFKA_ENDPOINT;
console.log('Will connect to kafka using '+kafka_endpoint);
client = new kafka.KafkaClient({ kafkaHost : kafka_endpoint });
console.log('Client connected to kafka using '+kafka_endpoint);
admin = new kafka.Admin(client);
console.log('Kafka Admin created - try to list topics...');
admin.listTopics((err, res) => { console.log('topics', res); });
console.log('Try to create Kafka Consumer...');
consumer = new kafka.Consumer( client, [ { topic: 'open-metadata.repository-services.cohort.'+cohortName+'.OMRSTopic', partition: 0 } ], { autoCommit: true });
consumer.on('error', function() { io.emit('cohort', 'does not exist - please try a different cohort name')});
consumer.on('message', function(message) {
try {
var data = JSON.parse(message.value);
io.emit('event', data); console.log(data);
}
catch (e) {
if (e instanceof SyntaxError) {
console.log('Could not make sense of JSON - skipping this event');
} else {
console.log('Encountered (non-syntax) error in JSON.parse - skipping this event');
}
function setKafka(){
/// setting up kafka consummer
console.log("Setting up Kafka clients");
Consumer = kafka.Consumer;
client = new kafka.Client('zk:2181/');
consumer = new Consumer(
client,
// payloads
[{ topic: 'speedd-fraud-actions', partition: 0, offset: 0 },
{ topic: 'speedd-fraud-out-events', partition: 0, offset: 0 }
],
// options
{fromOffset: true} // true = read messages from beginning
);
//// Setting up Kafka Producer
Producer = kafka.Producer;
producer = new Producer(client);
producer.on('ready', function () {
function setKafka(){
/// setting up kafka consummer
console.log("Setting up Kafka clients");
Consumer = kafka.Consumer;
client = new kafka.Client('localhost:2181/');
consumer = new Consumer(
client,
// payloads
[{ topic: 'speedd-traffic-actions', partition: 0, offset: 0 },
{ topic: 'speedd-traffic-out-events', partition: 0, offset: 0 }
],
// options
{fromOffset: true} // true = read messages from beginning
);
//// Setting up Kafka Producer
Producer = kafka.Producer;
producer = new Producer(client);
payloads = [
{ topic: 'speedd-out-events', messages: 'THIS IS THE NEW APP', partition: 0 }
function setKafka(){
/// setting up kafka consummer
console.log("Setting up Kafka clients");
Consumer = kafka.Consumer;
client = new kafka.Client('localhost:2181/');
consumer = new Consumer(
client,
// payloads
[{ topic: 'speedd-fraud-actions', partition: 0, offset: 0 },
{ topic: 'speedd-fraud-out-events', partition: 0, offset: 0 },
{ topic: 'speedd-fraud-in-events', partition: 0, offset: 0 }
],
// options
{fromOffset: true} // true = read messages from beginning
);
//// Setting up Kafka Producer
Producer = kafka.Producer;
producer = new Producer(client);
payloads = [
function initializeKafkaConsumer(attempt) {
try {
console.log("Try to initialize Kafka Client and Consumer, attempt " + attempt);
var client = new kafka.Client(kafkaHost + ":"+zookeeperPort+"/")
console.log("created client for " + kafkaHost);
consumer = new Consumer(
client,
[],
{ fromOffset: true }
);
console.log("Kafka Client and Consumer initialized " + consumer);
// register the handler for any messages received by the consumer on any topic it is listening to.
consumer.on('message', function (message) {
console.log("event received");
handleEventBusMessage(message);
});
consumer.on('error', function (err) {
console.log("error in creation of Kafka consumer " + JSON.stringify(err));
console.log("Try again in 5 seconds");
setTimeout(initializeKafkaConsumer, 5000, attempt + 1);
import logger from "../config/logger";
var kafka = require("kafka-node");
import envVariables from "../EnvironmentVariables";
const Producer = kafka.Producer;
let client;
// if (process.env.NODE_ENV === "development") {
// client = new kafka.Client();
client = new kafka.KafkaClient({ kafkaHost: envVariables.KAFKA_BROKER_HOST });
// console.log("local - ");
// } else {
// client = new kafka.KafkaClient({ kafkaHost: envVariables.KAFKA_BROKER_HOST });
// console.log("cloud - ");
// }
const producer = new Producer(client);
producer.on("ready", function() {
logger.info("Producer is ready");
});
producer.on("error", function(err) {
logger.error("Producer is in error state");
logger.error(err.stack || err);
});
exports.consume = function (topic, cb) {
var options = {
autoCommit: false,
fromBeginning: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024*1024
};
// use default partitions
var topics = [
{ topic: topic, partition: 0 },
{ topic: topic, partition: 1 }
];
var consumer = new kafka.Consumer(kafkaClient, topics, options);
var offset = new kafka.Offset(kafkaClient);
consumer.on('message', cb);
consumer.on('error', function (err) {
console.log(err);
throw err;
});
// recompute offset
consumer.on('offsetOutOfRange', function (topic) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
app.use(express.static(__dirname + '/public'));
app.use('/bower_components', express.static(__dirname + '/bower_components'));
app.get('/', function (req, res) {
res.sendFile(__dirname + '/index.html');
});
app.get('/historical', function (req, res) {
res.sendFile(__dirname + '/public/historical.html');
});
// Kafka Consumer Config
var zkserver = 'localhost:2181'; // Kafka Server Address
var kafka_client_id = 'reporting-layer';
var kafkaClient = new kafka.Client(zkserver,kafka_client_id);
var consumer = new kafka.Consumer(kafkaClient,[{ topic: 'bounceRate' },{ topic: 'averageTime' },{ topic: 'usersPerCategory' },{ topic: 'hitsByMarketingChannels' },{ topic: 'pagesByBounceRate' }],{autoCommit: true});
//cassandra configurations
var client = new cassandra.Client({contactPoints: ['localhost'], keyspace: 'rajsarka'});
// Define action to take when a websocket connection is established
io.on('connection', function (socket) {
console.log("A client is connected.");
//fetch conversion summary data from cassandra
socket.on('fetch-conversionSummaryChartData',function(query){
client.execute(query, function (err, result) {
if(err){
console.log(err);
}
console.log('executing query: ' + query);
console.log('processing data');
function setKafka(){
/// setting up kafka consummer
console.log("Setting up Kafka clients");
Consumer = kafka.Consumer;
client = new kafka.Client(zk);
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 'speedd-fraud-out-events', partition: 0, time: -1, maxNum: 1 }
], function (err, data) {
if(err != null){
console.error("Error: " + JSON.stringify(err));
return;
}
console.log("Offset data: " + JSON.stringify(data));
var outEventsOffset = data['speedd-fraud-out-events'][0][0];
console.log("Events offset: " + outEventsOffset);
consumer = new Consumer(