Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
}
if (this._receiver && this._receiver.credit > 0) {
log.messageSession(
"[%s] Receiver '%s': Draining leftover credits(%d).",
this._context.namespace.connectionId,
this.name,
this._receiver.credit
);
// Setting drain must be accompanied by a flow call (aliased to addCredit in this case).
this._receiver.drain = true;
this._receiver.addCredit(1);
} else {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
}
this.isReceivingMessages = false;
log.messageSession(
"[%s] Receiver '%s': Resolving receiveMessages() with %d messages.",
this._context.namespace.connectionId,
this.name,
brokeredMessages.length
);
resolve(brokeredMessages);
}
};
}
if (this._receiver && this._receiver.credit > 0) {
log.batching(
"[%s] Receiver '%s': Draining leftover credits(%d).",
this._context.namespace.connectionId,
this.name,
this._receiver.credit
);
// Setting drain must be accompanied by a flow call (aliased to addCredit in this case).
this._receiver.drain = true;
this._receiver.addCredit(1);
} else {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
}
this.isReceivingMessages = false;
log.batching(
"[%s] Receiver '%s': Resolving receiveMessages() with %d messages.",
this._context.namespace.connectionId,
this.name,
brokeredMessages.length
);
resolve(brokeredMessages);
}
};
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// (complete,/abandon/defer/deadletter) the messages from the array.
this._receiver!.addCredit(maxMessageCount);
let msg: string = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
if (reuse) msg += " Receiver link already present, hence reusing it.";
log.batching(msg, this._context.namespace.connectionId, maxWaitTimeInSeconds, this.name);
totalWaitTimer = setTimeout(
actionAfterWaitTimeout,
(maxWaitTimeInSeconds as number) * 1000
);
};
if (this.isOpen()) {
this._receiver!.on(ReceiverEvents.message, onReceiveMessage);
this._receiver!.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer(true);
} else {
const msg =
`MessageSession "${this.name}" with sessionId "${this.sessionId}", ` +
`is already closed. Hence cannot receive messages in a batch.`;
log.error("[%s] %s", this._context.namespace.connectionId, msg);
reject(new Error(msg));
}
});
}
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// (complete,/abandon/defer/deadletter) the messages from the array.
this._receiver!.addCredit(maxMessageCount);
let msg: string = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
if (reuse) msg += " Receiver link already present, hence reusing it.";
log.batching(msg, this._context.namespace.connectionId, idleTimeoutInSeconds, this.name);
firstMessageWaitTimer = setTimeout(
actionAfterWaitTimeout,
(idleTimeoutInSeconds as number) * 1000
);
};
if (this.isOpen()) {
this._receiver!.on(ReceiverEvents.message, onReceiveMessage);
this._receiver!.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer(true);
} else {
const msg =
`MessageSession "${this.name}" with sessionId "${this.sessionId}", ` +
`is already closed. Hence cannot receive messages in a batch.`;
log.error("[%s] %s", this._context.namespace.connectionId, msg);
reject(new Error(msg));
}
});
}
const onReceiveDrain: OnAmqpEvent = () => {
this._receiver!.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
this._receiver!.drain = false;
this.isReceivingMessages = false;
log.messageSession(
"[%s] Receiver '%s' drained. Resolving receiveMessages() with %d messages.",
this._context.namespace.connectionId,
this.name,
brokeredMessages.length
);
resolve(brokeredMessages);
};
const onReceiveError: OnAmqpEvent = (context: EventContext) => {
this.isReceivingMessages = false;
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
const receiverError = context.receiver && context.receiver.error;
let error = new MessagingError("An error occurred while receiving messages.");
if (receiverError) {
error = translate(receiverError);
log.error(
"[%s] Receiver '%s' received an error:\n%O",
this._context.namespace.connectionId,
this.name,
error
);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}