Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
): Promise {
if (this._isManagingSessions) {
throw new Error(
`${entityType}Client for "${this._context.namespace.config.entityPath}" ` +
`is already receiving messages from sessions. Please close this ${entityType}Client or ` +
`create a new one and receiveMessages from Sessions.`
);
}
this._isManagingSessions = true;
this._isCancelRequested = false;
if (!options) options = {};
if (options.maxConcurrentSessions) this.maxConcurrentSessions = options.maxConcurrentSessions;
// We are explicitly configuring the messageSession to timeout in 60 seconds (if not provided
// by the user) when no new messages are received.
if (!options.newMessageWaitTimeoutInSeconds) {
options.newMessageWaitTimeoutInSeconds = Constants.defaultOperationTimeoutInSeconds;
}
this._maxConcurrentSessionsSemaphore = new Semaphore(this.maxConcurrenSessions);
this._maxPendingAcceptSessionsSemaphore = new Semaphore(
this.maxConcurrentAcceptSessionRequests
);
for (let i = 0; i < this._maxConcurrentAcceptSessionRequests; i++) {
this._acceptSessionAndReceiveMessages(onMessage, onError, options).catch((err) => {
log.error(err);
});
}
}async receiveMessages(
maxMessageCount: number,
maxWaitTimeInSeconds?: number
): Promise {
if (maxWaitTimeInSeconds == null) {
maxWaitTimeInSeconds = Constants.defaultOperationTimeoutInSeconds;
}
const brokeredMessages: ServiceBusMessage[] = [];
this.isReceivingMessages = true;
return new Promise((resolve, reject) => {
let totalWaitTimer: any;
const setnewMessageWaitTimeoutInSeconds = (value?: number): void => {
this.newMessageWaitTimeoutInSeconds = value;
};
setnewMessageWaitTimeoutInSeconds(1);
// Action to be performed on the "receiver_drained" event.
const onReceiveDrain: OnAmqpEvent = () => {const timer = setTimeout(() => {
this._deliveryDispositionMap.delete(delivery.id);
log.receiver(
"[%s] Disposition for delivery id: %d, did not complete in %d milliseconds. " +
"Hence rejecting the promise with timeout error.",
this._context.namespace.connectionId,
delivery.id,
Constants.defaultOperationTimeoutInSeconds * 1000
);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description:
"Operation to settle the message has timed out. The disposition of the " +
"message may or may not be successful"
};
return reject(translate(e));
}, Constants.defaultOperationTimeoutInSeconds * 1000);
this._deliveryDispositionMap.set(delivery.id, {async receiveMessages(
maxMessageCount: number,
idleTimeoutInSeconds?: number
): Promise {
if (idleTimeoutInSeconds == undefined) {
idleTimeoutInSeconds = Constants.defaultOperationTimeoutInSeconds;
}
const brokeredMessages: ServiceBusMessage[] = [];
this.isReceivingMessages = true;
return new Promise((resolve, reject) => {
let onReceiveMessage: OnAmqpEventAsPromise;
let onReceiveDrain: OnAmqpEvent;
let firstMessageWaitTimer: any;
let actionAfterWaitTimeout: Func;
const setnewMessageWaitTimeoutInSeconds = (value?: number) => {
this.newMessageWaitTimeoutInSeconds = value;
};
setnewMessageWaitTimeoutInSeconds(1);log.receiver(
"[%s] Disposition for delivery id: %d, did not complete in %d milliseconds. " +
"Hence rejecting the promise with timeout error.",
this._context.namespace.connectionId,
delivery.id,
Constants.defaultOperationTimeoutInSeconds * 1000
);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description:
"Operation to settle the message has timed out. The disposition of the " +
"message may or may not be successful"
};
return reject(translate(e));
}, Constants.defaultOperationTimeoutInSeconds * 1000);
this._deliveryDispositionMap.set(delivery.id, {
resolve: resolve,
reject: reject,
timer: timer
});
if (operation === DispositionType.complete) {
delivery.accept();
} else if (operation === DispositionType.abandon) {
const params: any = {
undeliverable_here: false
};
if (options.propertiesToModify) params.message_annotations = options.propertiesToModify;
delivery.modified(params);
} else if (operation === DispositionType.defer) {
const params: any = {
undeliverable_here: true`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
};
this._sender!.on(SenderEvents.accepted, onAccepted);
this._sender!.on(SenderEvents.rejected, onRejected);
this._sender!.on(SenderEvents.modified, onModified);
this._sender!.on(SenderEvents.released, onReleased);
waitTimer = setTimeout(
actionAfterTimeout,
Constants.defaultOperationTimeoutInSeconds * 1000
);
try {
const delivery = this._sender!.send(
encodedMessage,
undefined,
sendBatch ? 0x80013700 : 0
);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.namespace.connectionId,
this.name,
delivery.id
);
} catch (error) {
removeListeners();
return reject(error);receive(maxMessageCount: number, maxWaitTimeInSeconds?: number): Promise {
throwErrorIfConnectionClosed(this._context.namespace);
if (maxWaitTimeInSeconds == null) {
maxWaitTimeInSeconds = Constants.defaultOperationTimeoutInSeconds;
}
const brokeredMessages: ServiceBusMessage[] = [];
this.isReceivingMessages = true;
return new Promise((resolve, reject) => {
let totalWaitTimer: NodeJS.Timer | undefined;
const onSessionError: OnAmqpEvent = (context: EventContext) => {
this.isReceivingMessages = false;
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);const timer = setTimeout(() => {
this._deliveryDispositionMap.delete(delivery.id);
log.receiver(
"[%s] Disposition for delivery id: %d, did not complete in %d milliseconds. " +
"Hence rejecting the promise with timeout error",
this._context.namespace.connectionId,
delivery.id,
Constants.defaultOperationTimeoutInSeconds * 1000
);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description:
"Operation to settle the message has timed out. The disposition of the " +
"message may or may not be successful"
};
return reject(translate(e));
}, Constants.defaultOperationTimeoutInSeconds * 1000);
this._deliveryDispositionMap.set(delivery.id, {`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
};
this._sender!.on(SenderEvents.accepted, onAccepted);
this._sender!.on(SenderEvents.rejected, onRejected);
this._sender!.on(SenderEvents.modified, onModified);
this._sender!.on(SenderEvents.released, onReleased);
waitTimer = setTimeout(
actionAfterTimeout,
Constants.defaultOperationTimeoutInSeconds * 1000
);
try {
const delivery = this._sender!.send(
encodedMessage,
undefined,
sendBatch ? 0x80013700 : 0
);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.namespace.connectionId,
this.name,
delivery.id
);
} catch (error) {
removeListeners();
return reject(error);receive(maxMessageCount: number, idleTimeoutInSeconds?: number): Promise {
throwErrorIfConnectionClosed(this._context.namespace);
if (idleTimeoutInSeconds == null) {
idleTimeoutInSeconds = Constants.defaultOperationTimeoutInSeconds;
}
const brokeredMessages: ServiceBusMessage[] = [];
this.isReceivingMessages = true;
return new Promise((resolve, reject) => {
let totalWaitTimer: NodeJS.Timer | undefined;
const onSessionError: OnAmqpEvent = (context: EventContext) => {
this.isReceivingMessages = false;
const receiver = this._receiver || context.receiver!;
receiver.removeListener(ReceiverEvents.receiverError, onReceiveError);
receiver.removeListener(ReceiverEvents.message, onReceiveMessage);
receiver.removeListener(ReceiverEvents.receiverDrained, onReceiveDrain);
receiver.session.removeListener(SessionEvents.sessionError, onSessionError);