Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async shouldNackAndRequeueTimes3(
message: object,
raw: amqplib.ConsumeMessage,
) {
++this.nackCount;
nackAndRequeueHandler();
// await sleep(15);
if (this.nackCount >= 3) {
return new Nack();
}
return new Nack(true);
}
}
async shouldNackAndRequeueTimes3(
message: object,
raw: amqplib.ConsumeMessage,
) {
++this.nackCount;
nackAndRequeueHandler();
// await sleep(15);
if (this.nackCount >= 3) {
return new Nack();
}
return new Nack(true);
}
}
@Injectable()
class SubscribeService {
nackCount = 0;
@RabbitSubscribe({
exchange,
routingKey: nackRoutingKey,
queue: nackRoutingKey,
})
shouldNack(message: object, raw: amqplib.ConsumeMessage) {
nackHandler();
return new Nack();
}
@RabbitSubscribe({
exchange,
routingKey: nackAndRequeueRoutingKey,
queue: nackAndRequeueRoutingKey,
})
async shouldNackAndRequeueTimes3(
message: object,
raw: amqplib.ConsumeMessage,
) {
++this.nackCount;
nackAndRequeueHandler();
// await sleep(15);
if (this.nackCount >= 3) {
return new Nack();
}
return new Nack(true);
}
it('should configure RabbitMQ with useExisting explicit provide', async () => {
const spy = jest.spyOn(amqplib, 'connect');
const instance = new RabbitConfig();
app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useExisting: {
provide: RabbitConfig,
value: instance,
},
}),
],
}).compile();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith(uri);
});
beforeAll(async () => {
const moduleFixture = await Test.createTestingModule({
providers: [SubscribeService],
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: () => ({
exchanges: [
{
name: exchange,
type: 'topic',
},
],
uri,
}),
}),
],
}).compile();
app = moduleFixture.createNestApplication();
await app.init();
amqpConnection = app.get(AmqpConnection);
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
providers: [SubscribeService],
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
{
name: exchange,
type: 'topic',
},
],
uri,
}),
],
}).compile();
app = moduleFixture.createNestApplication();
amqpConnection = app.get(AmqpConnection);
await app.init();
});
it('should configure RabbitMQ', async () => {
const spy = jest.spyOn(amqplib, 'connect');
app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
uri,
}),
],
}).compile();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith(uri);
});
});
shouldNack(message: object, raw: amqplib.ConsumeMessage) {
nackHandler();
return new Nack();
}
import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { RpcService } from './rpc/rpc.service';
const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost';
@Module({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: () => ({
exchanges: [
{
name: 'exchange1',
type: 'topic',
},
],
uri: `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`,
}),
}),
],
controllers: [AppController],
providers: [RpcService],
})
export class AppModule {}
} from '@golevelup/nestjs-rabbitmq';
import { INestApplication, Injectable } from '@nestjs/common';
import { Test } from '@nestjs/testing';
const testHandler = jest.fn();
const exchange = 'testSubscribeExhange';
const routingKey1 = 'testSubscribeRoute1';
const routingKey2 = 'testSubscribeRoute2';
const testMessage = {
messageProp: 42,
};
@Injectable()
class SubscribeService {
@RabbitSubscribe({
exchange,
routingKey: [routingKey1, routingKey2],
queue: 'subscribeQueue',
})
handleSubscribe(message: object) {
testHandler(message);
}
}
describe('Rabbit Subscribe', () => {
let app: INestApplication;
let amqpConnection: AmqpConnection;
const rabbitHost = process.env.NODE_ENV === 'ci' ? 'rabbit' : 'localhost';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:5672`;