How to use the kafkajs.Kafka function in kafkajs

To help you get started, we’ve selected a few kafkajs examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github al66 / imicros-flow / examples / performance.connections.native.js View on Github external
"use strict";
const { Kafka } = require("kafkajs");

const PERFORMANCE_TEST = true;

const serviceLogger = () => ({ label, log }) => {
    if (!PERFORMANCE_TEST) console.log(label + " namespace:" + log.message, log);
};

// Create the client with the broker list
const kafka = new Kafka({
    clientId: "test",
    brokers: ["192.168.2.124:9092"],
    logLevel: 5,
    logCreator: serviceLogger
});


const consumers = [
    kafka.consumer({ groupId: "g1" + Date.now() }),
    kafka.consumer({ groupId: "g2" + Date.now() }),
    kafka.consumer({ groupId: "g3" + Date.now() })
];
const producer = kafka.producer();

let receipts = 0;
let emits = 0;
github al66 / imicros-flow / lib / flow.static.subscriber.mixed.js View on Github external
this.serviceLogger = kafkalogLevel => ({ namespace, level, label, log }) => {
            switch(level) {
              case logLevel.ERROR:
              case logLevel.NOTHING:
                return this.logger.error("KAFKAJS: " + namespace + log.message, log);
              case logLevel.WARN:
                return this.logger.warn("KAFKAJS: " + namespace + log.message, log);
              case logLevel.INFO:
                return this.logger.info("KAFKAJS: " + namespace + log.message, log);
              case logLevel.DEBUG:
                return this.logger.debug("KAFKAJS: " + namespace + log.message, log);
            }
        }
        
        // Create the client with the broker list
        this.kafka = new Kafka({
          clientId: this.clientId,
          brokers: this.brokers,
          logLevel: 5, //logLevel.DEBUG,
          logCreator: this.serviceLogger
        })

        // Map kafka-node log to service logger
        let serviceLogger = (name) => {
            return {
                debug: this.logger.debug,
                info: this.logger.info,
                warn: this.logger.warn,
                error: this.logger.error
            }
        }
        kafkaLogging.setLoggerProvider(serviceLogger);
github al66 / imicros-flow / lib / flow.publisher.js View on Github external
case logLevel.INFO:
                    return this.logger.info("namespace:" + log.message, log);
                case logLevel.DEBUG:
                    return this.logger.debug("namespace:" + log.message, log);
            }
        };
        
        this.defaults = {
            connectionTimeout: 1000,
            retry: {
                initialRetryTime: 100,
                retries: 8
            }
        };
				// Create the client with the broker list
        this.kafka = new Kafka({
            clientId: this.clientId,
            brokers: this.brokers,
            logLevel: 5, //logLevel.DEBUG,
            logCreator: this.serviceLogger,
            ssl: this.settings.ssl || null,     // refer to kafkajs documentation
            sasl: this.settings.sasl || null,   // refer to kafkajs documentation
            connectionTimeout: this.settings.connectionTimeout ||  this.defaults.connectionTimeout,
            retry: this.settings.retry || this.defaults.retry
        });

        this.defaultTopic = this.settings.topic || "events";
        
    },
github al66 / imicros-flow / lib / flow.static.subscriber.js View on Github external
case logLevel.INFO:
                    return this.logger.info("KAFKAJS: " + namespace + log.message, log);
                case logLevel.DEBUG:
                    return this.logger.debug("KAFKAJS: " + namespace + log.message, log);
            }
        };

        this.defaults = {
            connectionTimeout: 1000,
            retry: {
                initialRetryTime: 100,
                retries: 8
            }
        };
				// Create the client with the broker list
        this.kafka = new Kafka({
            clientId: this.clientId,
            brokers: this.brokers,
            logLevel: 5, //logLevel.DEBUG,
            logCreator: this.serviceLogger,
            ssl: this.settings.ssl || null,     // refer to kafkajs documentation
            sasl: this.settings.sasl || null,   // refer to kafkajs documentation
            connectionTimeout: this.settings.connectionTimeout ||  this.defaults.connectionTimeout,
            retry: this.settings.retry || this.defaults.retry
        });

        this.topics = {
            events: this.settings.topics ? this.settings.topics.events || "events" : "events"
        };

        this.publisher = this.settings.publisher || "flow.publisher";
github erikengervall / dockest / packages / examples / multiple-resources / kafka-1-kafkajs / app.ts View on Github external
const { Kafka, logLevel } = require('kafkajs') // eslint-disable-line @typescript-eslint/no-var-requires

type JestFn = (_: any) => void

const kafka = new Kafka({
  brokers: ['localhost:9092'],
  clientId: 'dockest_example',
  logLevel: logLevel.NOTHING,
  retry: {
    initialRetryTime: 2500,
    retries: 10,
  },
})

const createConsumer = (
  mockConsumptionCallback: JestFn,
): { consumer: any; startConsuming: () => Promise; stopConsuming: () => Promise } => {
  const consumer = kafka.consumer({ groupId: 'dockest_group_1' })

  const startConsuming = async () => {
    await consumer.connect()
github al66 / imicros-flow / lib / flow.publisher.kafkajs.js View on Github external
this.serviceLogger = () => ({ level, log }) => {
            switch(level) {
                case logLevel.ERROR:
                case logLevel.NOTHING:
                    return this.logger.error("namespace:" + log.message, log);
                case logLevel.WARN:
                    return this.logger.warn("namespace:" + log.message, log);
                case logLevel.INFO:
                    return this.logger.info("namespace:" + log.message, log);
                case logLevel.DEBUG:
                    return this.logger.debug("namespace:" + log.message, log);
            }
        };
        
        // Create the client with the broker list
        this.kafka = new Kafka({
            clientId: this.clientId,
            brokers: this.brokers,
            logLevel: logLevel.DEBUG,
            logCreator: this.serviceLogger,
            connectionTimeout: this.settings.connectionTimeout ||  1000
        });

        this.topics = {
            events: this.settings.topics ? this.settings.topics.events || "events" : "events"
        };
        
    },
github vitta-health / attiv / framework / crosscutting / events / storeKafka.ts View on Github external
init() {
    const brokers = [...process.env.KAFKA_BROKERS_ENDPOINT.split(',')];

    this.kafka = new Kafka({
      clientId: process.env.KAFKA_CLIENT_IDENTIFIER,
      brokers,
    });

    this.subscribes.forEach(subscribe => {
      if (subscribe.listener && subscribe.name) {
        this.addListener(subscribe.listener, subscribe.name);
      }
    });
  }
github nodefluent / node-sinek / lib / kafkajs / JSConsumer.js View on Github external
}

    if (!config.noptions) {
      config.noptions = {};
    }

    const brokers = config.kafkaHost ||
      (config.noptions["metadata.broker.list"] && config.noptions["metadata.broker.list"].split(","));
    const clientId = config.noptions["client.id"];

    if (!brokers || !clientId) {
      throw new Error("You are missing a broker or group configs");
    }

    if (config.noptions["security.protocol"]) {
      this.kafkaClient = new Kafka({
        brokers,
        clientId,
        ssl: {
          ca: [fs.readFileSync(config.noptions["ssl.ca.location"], "utf-8")],
          cert: fs.readFileSync(config.noptions["ssl.certificate.location"], "utf-8"),
          key: fs.readFileSync(config.noptions["ssl.key.location"], "utf-8"),
          passphrase: config.noptions["ssl.key.password"],
        },
        sasl: {
          mechanism: config.noptions["sasl.mechanisms"],
          username: config.noptions["sasl.username"],
          password: config.noptions["sasl.password"],
        },
      });
    } else {
      this.kafkaClient = new Kafka({ brokers, clientId });
github confluentinc / training-developer-src / solution / fallback-producer / producer / server.js View on Github external
const SQL = require('sql-template-strings');
const sqlite = require('sqlite');
const { Kafka } = require('kafkajs')


const kafka = new Kafka({
  clientId: 'vp-producer-fallback',
  brokers: ['kafka:9092']
})

const producer = kafka.producer();
const run = async () => {
    await producer.connect()
    await sendMessages();
}

async function sendMessages(){
    const db = await sqlite.open('../db/vehicle-positions.db');
    
    let lastId = 0;
    let currentId = 0;
github nodefluent / node-sinek / lib / kafkajs / JSProducer.js View on Github external
brokers,
        clientId,
        ssl: {
          ca: [fs.readFileSync(config.noptions["ssl.ca.location"], "utf-8")],
          cert: fs.readFileSync(config.noptions["ssl.certificate.location"], "utf-8"),
          key: fs.readFileSync(config.noptions["ssl.key.location"], "utf-8"),
          passphrase: config.noptions["ssl.key.password"],
        },
        sasl: {
          mechanism: config.noptions["sasl.mechanisms"],
          username: config.noptions["sasl.username"],
          password: config.noptions["sasl.password"],
        },
      });
    } else {
      this.kafkaClient = new Kafka({ brokers, clientId });
    }

    this.config = config;
    this._health = new ProducerHealth(this, this.config.health || {});
    this._adminClient = this.kafkaClient.admin();

    this.paused = false;
    this.producer = null;
    this._producerPollIntv = null;
    this.defaultPartitionCount = defaultPartitionCount;
    this._partitionCounts = {};
    this._inClosing = false;
    this._totalSentMessages = 0;
    this._lastProcessed = null;
    this._analyticsOptions = null;
    this._analyticsIntv = null;

kafkajs

A modern Apache Kafka client for node.js

MIT
Latest version published 2 years ago

Package Health Score

82 / 100
Full package analysis