Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
source: {
address: this.address
},
credit_window: 0,
onMessage: options.onMessage || ((context: EventContext) => this._onAmqpMessage(context)),
onError: options.onError || ((context: EventContext) => this._onAmqpError(context)),
onClose: options.onClose || ((context: EventContext) => this._onAmqpClose(context)),
onSessionError:
options.onSessionError || ((context: EventContext) => this._onAmqpSessionError(context)),
onSessionClose:
options.onSessionClose || ((context: EventContext) => this._onAmqpSessionClose(context))
};
if (typeof this.ownerLevel === "number") {
rcvrOptions.properties = {
[Constants.attachEpoch]: types.wrap_long(this.ownerLevel)
};
}
if (this.options.trackLastEnqueuedEventProperties) {
rcvrOptions.desired_capabilities = Constants.enableReceiverRuntimeMetricName;
}
const eventPosition = options.eventPosition || this.eventPosition;
if (eventPosition) {
// Set filter on the receiver if event position is specified.
const filterClause = getEventPositionFilter(eventPosition);
if (filterClause) {
(rcvrOptions.source as any).filter = {
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468c00000004)
};
}
const rcvrOptions: ReceiverOptions = {
name: this.name,
autoaccept: true,
source: {
address: this.address
},
credit_window: this.prefetchCount,
onMessage: options.onMessage || this._onAmqpMessage,
onError: options.onError || this._onAmqpError,
onClose: options.onClose || this._onAmqpClose,
onSessionError: options.onSessionError || this._onSessionError,
onSessionClose: options.onSessionClose || this._onSessionClose
};
if (this.epoch !== undefined && this.epoch !== null) {
if (!rcvrOptions.properties) rcvrOptions.properties = {};
rcvrOptions.properties[Constants.attachEpoch] = types.wrap_long(this.epoch);
}
if (this.identifier) {
if (!rcvrOptions.properties) rcvrOptions.properties = {};
rcvrOptions.properties[Constants.receiverIdentifierName] = this.identifier;
}
if (this.receiverRuntimeMetricEnabled) {
rcvrOptions.desired_capabilities = Constants.enableReceiverRuntimeMetricName;
}
const eventPosition = options.eventPosition || this.options.eventPosition;
if (eventPosition) {
// Set filter on the receiver if event position is specified.
const filterClause = eventPosition.getExpression();
if (filterClause) {
(rcvrOptions.source as any).filter = {
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468c00000004)
};
throwTypeErrorIfParameterNotLong(connId, "fromSequenceNumber", fromSequenceNumber);
// Checks for maxMessageCount
if (maxMessageCount !== undefined) {
throwTypeErrorIfParameterTypeMismatch(connId, "maxMessageCount", maxMessageCount, "number");
if (maxMessageCount <= 0) {
return [];
}
} else {
maxMessageCount = 1;
}
const messageList: ReceivedMessageInfo[] = [];
try {
const messageBody: any = {};
messageBody[Constants.fromSequenceNumber] = types.wrap_long(
Buffer.from(fromSequenceNumber.toBytesBE())
);
messageBody[Constants.messageCount] = types.wrap_int(maxMessageCount);
if (sessionId != undefined) {
messageBody[Constants.sessionIdMapKey] = sessionId;
}
const request: AmqpMessage = {
body: messageBody,
message_id: generate_uuid(),
reply_to: this.replyTo,
application_properties: {
operation: Constants.operations.peekMessage
}
};
const associatedLinkName = this._getAssociatedReceiverName(this._context, sessionId);
if (associatedLinkName) {