Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const finalAction = () => {
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
if (firstMessageWaitTimer) {
clearTimeout(firstMessageWaitTimer);
}
// Unsetting the newMessageWaitTimeoutInSeconds to undefined since we are done receiving
// a batch of messages.
setnewMessageWaitTimeoutInSeconds();
// Removing listeners, so that the next receiveMessages() call can set them again.
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
}
if (this._receiver && this._receiver.credit > 0) {
log.messageSession(
"[%s] Receiver '%s': Draining leftover credits(%d).",
this._context.namespace.connectionId,
this.name,
this._receiver.credit
);
// Setting drain must be accompanied by a flow call (aliased to addCredit in this case).
this._receiver.drain = true;
this._receiver.addCredit(1);
} else {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
const finalAction = (): void => {
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
// Unsetting the newMessageWaitTimeoutInSeconds to undefined since we are done receiving
// a batch of messages.
setnewMessageWaitTimeoutInSeconds();
// Removing listeners, so that the next receiveMessages() call can set them again.
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
}
if (this._receiver && this._receiver.credit > 0) {
log.messageSession(
"[%s] Receiver '%s': Draining leftover credits(%d).",
this._context.namespace.connectionId,
this.name,
this._receiver.credit
);
// Setting drain must be accompanied by a flow call (aliased to addCredit in this case).
this._receiver.drain = true;
this._receiver.addCredit(1);
} else {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver: Receiver = await connectionContext.connection.createReceiver(receiverOptions);
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("Received message: %O", context.message);
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(
">>>>> [%s] An error occurred for receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
receiverError
);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver: Receiver = await connectionContext.connection.createReceiver(
receiverOptions
);
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("Received message: %O", context.message);
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(
">>>>> [%s] An error occurred for receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
receiverError
);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
} catch (completeError) {
const translatedError = translate(completeError);
log.error(
"[%s] An error occurred while completing the message with id '%s' on the " +
"receiver '%s': %O.",
connectionId,
bMessage.messageId,
this.name,
translatedError
);
this._notifyError(translatedError);
}
}
};
// setting the "message" event listener.
this._receiver.on(ReceiverEvents.message, onSessionMessage);
// adding credit
this._receiver!.addCredit(this.maxConcurrentCalls);
} else {
this.isReceivingMessages = false;
const msg =
`MessageSession with sessionId '${this.sessionId}' and name '${this.name}' ` +
`has either not been created or is not open.`;
log.error("[%s] %s", this._context.namespace.connectionId, msg);
this._notifyError(new Error(msg));
}
}
// By adding credit here, we let the service know that at max we can handle `maxMessageCount`
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// (complete,/abandon/defer/deadletter) the messages from the array.
this._receiver!.addCredit(maxMessageCount);
let msg: string = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
if (reuse) msg += " Receiver link already present, hence reusing it.";
log.batching(msg, this._context.namespace.connectionId, maxWaitTimeInSeconds, this.name);
totalWaitTimer = setTimeout(
actionAfterWaitTimeout,
(maxWaitTimeInSeconds as number) * 1000
);
};
if (this.isOpen()) {
this._receiver!.on(ReceiverEvents.message, onReceiveMessage);
this._receiver!.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer(true);
} else {
const msg =
`MessageSession "${this.name}" with sessionId "${this.sessionId}", ` +
`is already closed. Hence cannot receive messages in a batch.`;
log.error("[%s] %s", this._context.namespace.connectionId, msg);
reject(new Error(msg));
}
});
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver: Receiver = await connectionContext.connection.createReceiver(receiverOptions);
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("Received message: %O", context.message);
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(
">>>>> [%s] An error occurred for receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
receiverError
);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
const finalAction = (): void => {
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
// Removing listeners, so that the next receiveMessages() call can set them again.
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
this._receiver.session.removeListener(SessionEvents.sessionError, onSessionError);
}
if (this.detachedError) {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
}
this.isReceivingMessages = false;
const err = translate(this.detachedError);
return reject(err);
}
if (this._receiver && this._receiver.credit > 0) {
log.batching(
"[%s] Receiver '%s': Draining leftover credits(%d).",
this._context.namespace.connectionId,
const actionAfterTimeout = () => {
timeOver = true;
this.receiver.removeListener(ReceiverEvents.message, messageCallback);
if (aborter) {
aborter.removeEventListener("abort", onAbort);
}
const address = this.receiver.address || "address";
const desc: string =
`The request with message_id "${request.message_id}" to "${address}" ` +
`endpoint timed out. Please try again later.`;
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
return reject(translate(e));
};