Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const finalAction = () => {
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
if (firstMessageWaitTimer) {
clearTimeout(firstMessageWaitTimer);
}
// Unsetting the newMessageWaitTimeoutInSeconds to undefined since we are done receiving
// a batch of messages.
setnewMessageWaitTimeoutInSeconds();
// Removing listeners, so that the next receiveMessages() call can set them again.
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
}
if (this._receiver && this._receiver.credit > 0) {
log.messageSession(
"[%s] Receiver '%s': Draining leftover credits(%d).",
this._context.namespace.connectionId,
this.name,
this._receiver.credit
);
// Setting drain must be accompanied by a flow call (aliased to addCredit in this case).
this._receiver.drain = true;
this._receiver.addCredit(1);
} else {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
const finalAction = (): void => {
if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
if (totalWaitTimer) {
clearTimeout(totalWaitTimer);
}
// Unsetting the newMessageWaitTimeoutInSeconds to undefined since we are done receiving
// a batch of messages.
setnewMessageWaitTimeoutInSeconds();
// Removing listeners, so that the next receiveMessages() call can set them again.
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
}
if (this._receiver && this._receiver.credit > 0) {
log.messageSession(
"[%s] Receiver '%s': Draining leftover credits(%d).",
this._context.namespace.connectionId,
this.name,
this._receiver.credit
);
// Setting drain must be accompanied by a flow call (aliased to addCredit in this case).
this._receiver.drain = true;
this._receiver.addCredit(1);
} else {
if (this._receiver) {
this._receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver: Receiver = await connectionContext.connection.createReceiver(receiverOptions);
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("Received message: %O", context.message);
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(
">>>>> [%s] An error occurred for receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
receiverError
);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver: Receiver = await connectionContext.connection.createReceiver(
receiverOptions
);
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("Received message: %O", context.message);
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(
">>>>> [%s] An error occurred for receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
receiverError
);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
} catch (completeError) {
const translatedError = translate(completeError);
log.error(
"[%s] An error occurred while completing the message with id '%s' on the " +
"receiver '%s': %O.",
connectionId,
bMessage.messageId,
this.name,
translatedError
);
this._notifyError(translatedError);
}
}
};
// setting the "message" event listener.
this._receiver.on(ReceiverEvents.message, onSessionMessage);
// adding credit
this._receiver!.addCredit(this.maxConcurrentCalls);
} else {
this.isReceivingMessages = false;
const msg =
`MessageSession with sessionId '${this.sessionId}' and name '${this.name}' ` +
`has either not been created or is not open.`;
log.error("[%s] %s", this._context.namespace.connectionId, msg);
this._notifyError(new Error(msg));
}
}
// By adding credit here, we let the service know that at max we can handle `maxMessageCount`
// number of messages concurrently. We will return the user an array of messages that can
// be of size upto maxMessageCount. Then the user needs to accordingly dispose
// (complete,/abandon/defer/deadletter) the messages from the array.
this._receiver!.addCredit(maxMessageCount);
let msg: string = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
if (reuse) msg += " Receiver link already present, hence reusing it.";
log.batching(msg, this._context.namespace.connectionId, maxWaitTimeInSeconds, this.name);
totalWaitTimer = setTimeout(
actionAfterWaitTimeout,
(maxWaitTimeInSeconds as number) * 1000
);
};
if (this.isOpen()) {
this._receiver!.on(ReceiverEvents.message, onReceiveMessage);
this._receiver!.on(ReceiverEvents.receiverDrained, onReceiveDrain);
addCreditAndSetTimer(true);
} else {
const msg =
`MessageSession "${this.name}" with sessionId "${this.sessionId}", ` +
`is already closed. Hence cannot receive messages in a batch.`;
log.error("[%s] %s", this._context.namespace.connectionId, msg);
reject(new Error(msg));
}
});
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver: Receiver = await connectionContext.connection.createReceiver(receiverOptions);
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("Received message: %O", context.message);
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(
">>>>> [%s] An error occurred for receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
receiverError
);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
async function main(): Promise {
await authenticate(ehConnectionConfig.getReceiverAudience("0"));
const receiverName = "receiver-1";
// Get messages from the past hour
const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() - 3600 * 1000}'`;
const receiverAddress = ehConnectionConfig.getReceiverAddress("0");
const receiverOptions: ReceiverOptions = {
name: receiverName,
source: {
address: receiverAddress,
filter: {
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468c00000004)
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver: Receiver = await connectionContext.connection.createReceiver(receiverOptions);
async function main(): Promise {
await authenticate(`${connectionConfig.endpoint}${path}`, false);
const receiverName = "receiver-1";
const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() - 3600 * 1000}'`; // Get messages from the past hour
const receiverAddress = `${path}/ConsumerGroups/$default/Partitions/0`; // For ServiceBus ""
const receiverOptions: ReceiverOptions = {
name: receiverName,
source: {
address: receiverAddress,
filter: {
// May not be required for ServiceBus. The current example is for EventHubs.
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468c00000004)
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver: Receiver = await connectionContext.connection.createReceiver(receiverOptions);
async function main(): Promise {
await authenticate(ehConnectionConfig.getReceiverAudience("0"));
const receiverName = "receiver-1";
// Get messages from the past hour
const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() -
3600 * 1000}'`;
const receiverAddress = ehConnectionConfig.getReceiverAddress("0");
const receiverOptions: ReceiverOptions = {
name: receiverName,
source: {
address: receiverAddress,
filter: {
"apache.org:selector-filter:string": types.wrap_described(
filterClause,
0x468c00000004
)
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}