How to use the @nestjs/microservices.Transport.KAFKA function in @nestjs/microservices

To help you get started, weโ€™ve selected a few @nestjs/microservices examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github nestjs / nest / integration / microservices / e2e / sum-kafka.spec.ts View on Github external
it(`Start Kafka app`, async () => {
    const module = await Test.createTestingModule({
      controllers: [KafkaController, KafkaMessagesController],
    }).compile();

    app = module.createNestApplication();
    server = app.getHttpAdapter().getInstance();

    app.connectMicroservice({
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
      },
    });
    await app.startAllMicroservicesAsync();
    await app.init();
  }).timeout(30000);
github nestjs / nest / integration / microservices / src / kafka / kafka.controller.ts View on Github external
import { Body, Controller, HttpCode, OnModuleInit, Post } from '@nestjs/common';
import { Logger } from '@nestjs/common/services/logger.service';
import { Client, ClientKafka, Transport } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { BusinessDto } from './dtos/business.dto';
import { UserDto } from './dtos/user.dto';

@Controller()
export class KafkaController implements OnModuleInit {
  protected readonly logger = new Logger(KafkaController.name);
  static IS_NOTIFIED = false;
  static MATH_SUM = 0;

  @Client({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9092'],
      },
    },
  })
  private readonly client: ClientKafka;

  onModuleInit() {
    const requestPatterns = [
      'math.sum.sync.kafka.message',
      'math.sum.sync.without.key',
      'math.sum.sync.plain.object',
      'math.sum.sync.array',
      'math.sum.sync.string',
      'math.sum.sync.number',
github nestjs / nest / integration / microservices / src / kafka / kafka-broadcast.controller.ts View on Github external
import { Controller, Get } from '@nestjs/common';
import {
  Client,
  ClientProxy,
  MessagePattern,
  Transport,
} from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { scan, take } from 'rxjs/operators';

@Controller()
export class KafkaBroadcastController {
  @Client({ transport: Transport.KAFKA })
  client: ClientProxy;

  @Get('broadcast')
  multicats() {
    return this.client
      .send('broadcast.test', {})
      .pipe(scan((a, b) => a + b), take(2));
  }

  @MessagePattern('broadcast.*')
  replyBroadcast(): Observable {
    return new Observable(observer => observer.next(1));
  }
}