Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"use strict";
const { Kafka } = require("kafkajs");
const PERFORMANCE_TEST = true;
const serviceLogger = () => ({ label, log }) => {
if (!PERFORMANCE_TEST) console.log(label + " namespace:" + log.message, log);
};
// Create the client with the broker list
const kafka = new Kafka({
clientId: "test",
brokers: ["192.168.2.124:9092"],
logLevel: 5,
logCreator: serviceLogger
});
const consumers = [
kafka.consumer({ groupId: "g1" + Date.now() }),
kafka.consumer({ groupId: "g2" + Date.now() }),
kafka.consumer({ groupId: "g3" + Date.now() })
];
const producer = kafka.producer();
let receipts = 0;
let emits = 0;
this.serviceLogger = kafkalogLevel => ({ namespace, level, label, log }) => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return this.logger.error("KAFKAJS: " + namespace + log.message, log);
case logLevel.WARN:
return this.logger.warn("KAFKAJS: " + namespace + log.message, log);
case logLevel.INFO:
return this.logger.info("KAFKAJS: " + namespace + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("KAFKAJS: " + namespace + log.message, log);
}
}
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: 5, //logLevel.DEBUG,
logCreator: this.serviceLogger
})
// Map kafka-node log to service logger
let serviceLogger = (name) => {
return {
debug: this.logger.debug,
info: this.logger.info,
warn: this.logger.warn,
error: this.logger.error
}
}
kafkaLogging.setLoggerProvider(serviceLogger);
case logLevel.INFO:
return this.logger.info("namespace:" + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("namespace:" + log.message, log);
}
};
this.defaults = {
connectionTimeout: 1000,
retry: {
initialRetryTime: 100,
retries: 8
}
};
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: 5, //logLevel.DEBUG,
logCreator: this.serviceLogger,
ssl: this.settings.ssl || null, // refer to kafkajs documentation
sasl: this.settings.sasl || null, // refer to kafkajs documentation
connectionTimeout: this.settings.connectionTimeout || this.defaults.connectionTimeout,
retry: this.settings.retry || this.defaults.retry
});
this.defaultTopic = this.settings.topic || "events";
},
case logLevel.INFO:
return this.logger.info("KAFKAJS: " + namespace + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("KAFKAJS: " + namespace + log.message, log);
}
};
this.defaults = {
connectionTimeout: 1000,
retry: {
initialRetryTime: 100,
retries: 8
}
};
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: 5, //logLevel.DEBUG,
logCreator: this.serviceLogger,
ssl: this.settings.ssl || null, // refer to kafkajs documentation
sasl: this.settings.sasl || null, // refer to kafkajs documentation
connectionTimeout: this.settings.connectionTimeout || this.defaults.connectionTimeout,
retry: this.settings.retry || this.defaults.retry
});
this.topics = {
events: this.settings.topics ? this.settings.topics.events || "events" : "events"
};
this.publisher = this.settings.publisher || "flow.publisher";
const { Kafka, logLevel } = require('kafkajs') // eslint-disable-line @typescript-eslint/no-var-requires
type JestFn = (_: any) => void
const kafka = new Kafka({
brokers: ['localhost:9092'],
clientId: 'dockest_example',
logLevel: logLevel.NOTHING,
retry: {
initialRetryTime: 2500,
retries: 10,
},
})
const createConsumer = (
mockConsumptionCallback: JestFn,
): { consumer: any; startConsuming: () => Promise; stopConsuming: () => Promise } => {
const consumer = kafka.consumer({ groupId: 'dockest_group_1' })
const startConsuming = async () => {
await consumer.connect()
this.serviceLogger = () => ({ level, log }) => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return this.logger.error("namespace:" + log.message, log);
case logLevel.WARN:
return this.logger.warn("namespace:" + log.message, log);
case logLevel.INFO:
return this.logger.info("namespace:" + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("namespace:" + log.message, log);
}
};
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: logLevel.DEBUG,
logCreator: this.serviceLogger,
connectionTimeout: this.settings.connectionTimeout || 1000
});
this.topics = {
events: this.settings.topics ? this.settings.topics.events || "events" : "events"
};
},
init() {
const brokers = [...process.env.KAFKA_BROKERS_ENDPOINT.split(',')];
this.kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_IDENTIFIER,
brokers,
});
this.subscribes.forEach(subscribe => {
if (subscribe.listener && subscribe.name) {
this.addListener(subscribe.listener, subscribe.name);
}
});
}
}
if (!config.noptions) {
config.noptions = {};
}
const brokers = config.kafkaHost ||
(config.noptions["metadata.broker.list"] && config.noptions["metadata.broker.list"].split(","));
const clientId = config.noptions["client.id"];
if (!brokers || !clientId) {
throw new Error("You are missing a broker or group configs");
}
if (config.noptions["security.protocol"]) {
this.kafkaClient = new Kafka({
brokers,
clientId,
ssl: {
ca: [fs.readFileSync(config.noptions["ssl.ca.location"], "utf-8")],
cert: fs.readFileSync(config.noptions["ssl.certificate.location"], "utf-8"),
key: fs.readFileSync(config.noptions["ssl.key.location"], "utf-8"),
passphrase: config.noptions["ssl.key.password"],
},
sasl: {
mechanism: config.noptions["sasl.mechanisms"],
username: config.noptions["sasl.username"],
password: config.noptions["sasl.password"],
},
});
} else {
this.kafkaClient = new Kafka({ brokers, clientId });
const SQL = require('sql-template-strings');
const sqlite = require('sqlite');
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'vp-producer-fallback',
brokers: ['kafka:9092']
})
const producer = kafka.producer();
const run = async () => {
await producer.connect()
await sendMessages();
}
async function sendMessages(){
const db = await sqlite.open('../db/vehicle-positions.db');
let lastId = 0;
let currentId = 0;
brokers,
clientId,
ssl: {
ca: [fs.readFileSync(config.noptions["ssl.ca.location"], "utf-8")],
cert: fs.readFileSync(config.noptions["ssl.certificate.location"], "utf-8"),
key: fs.readFileSync(config.noptions["ssl.key.location"], "utf-8"),
passphrase: config.noptions["ssl.key.password"],
},
sasl: {
mechanism: config.noptions["sasl.mechanisms"],
username: config.noptions["sasl.username"],
password: config.noptions["sasl.password"],
},
});
} else {
this.kafkaClient = new Kafka({ brokers, clientId });
}
this.config = config;
this._health = new ProducerHealth(this, this.config.health || {});
this._adminClient = this.kafkaClient.admin();
this.paused = false;
this.producer = null;
this._producerPollIntv = null;
this.defaultPartitionCount = defaultPartitionCount;
this._partitionCounts = {};
this._inClosing = false;
this._totalSentMessages = 0;
this._lastProcessed = null;
this._analyticsOptions = null;
this._analyticsIntv = null;