Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
encodedMessages[i] = RheaMessageUtil.encode(amqpMessage);
} catch (error) {
if (error instanceof TypeError || error.name === "TypeError") {
// `RheaMessageUtil.encode` can fail if message properties are of invalid type
// rhea throws errors with name `TypeError` but not an instance of `TypeError`, so catch them too
// Errors in such cases do not have user friendy message or call stack
// So use `getMessagePropertyTypeMismatchError` to get a better error message
error = getMessagePropertyTypeMismatchError(inputMessages[i]) || error;
}
throw error;
}
}
// Convert every encoded message to amqp data section
const batchMessage: AmqpMessage = {
body: RheaMessageUtil.data_sections(encodedMessages)
};
// Set message_annotations, application_properties and properties of the first message as
// that of the envelope (batch message).
if (amqpMessages[0].message_annotations) {
batchMessage.message_annotations = amqpMessages[0].message_annotations;
}
if (amqpMessages[0].application_properties) {
batchMessage.application_properties = amqpMessages[0].application_properties;
}
for (const prop of messageProperties) {
if ((amqpMessages[0] as any)[prop]) {
(batchMessage as any)[prop] = (amqpMessages[0] as any)[prop];
}
}
// Finally encode the envelope (batch message).
let encodedBatchMessage: Buffer | undefined;
if (isEventDataBatch(events)) {
encodedBatchMessage = events._message!;
} else {
const partitionKey = (options && options.partitionKey) || undefined;
const messages: AmqpMessage[] = [];
// Convert EventData to AmqpMessage.
for (let i = 0; i < events.length; i++) {
const message = toAmqpMessage(events[i], partitionKey);
message.body = this._context.dataTransformer.encode(events[i].body);
messages[i] = message;
}
// Encode every amqp message and then convert every encoded message to amqp data section
const batchMessage: AmqpMessage = {
body: message.data_sections(messages.map(message.encode))
};
// Set message_annotations of the first message as
// that of the envelope (batch message).
if (messages[0].message_annotations) {
batchMessage.message_annotations = messages[0].message_annotations;
}
// Finally encode the envelope (batch message).
encodedBatchMessage = message.encode(batchMessage);
}
logger.info(
"[%s] Sender '%s', sending encoded batch message.",
this._context.connectionId,
this.name,
encodedBatchMessage
);
if (!previouslyInstrumented) {
const messageSpan = createMessageSpan(options.parentSpan);
eventData = instrumentEventData(eventData, messageSpan);
this._spanContexts.push(messageSpan.context());
messageSpan.end();
}
// Convert EventData to AmqpMessage.
const amqpMessage = toAmqpMessage(eventData, this._partitionKey);
amqpMessage.body = this._context.dataTransformer.encode(eventData.body);
// Encode every amqp message and then convert every encoded message to amqp data section
this._encodedMessages.push(message.encode(amqpMessage));
const batchMessage: AmqpMessage = {
body: message.data_sections(this._encodedMessages)
};
if (amqpMessage.message_annotations) {
batchMessage.message_annotations = amqpMessage.message_annotations;
}
const encodedBatchMessage = message.encode(batchMessage);
const currentSize = encodedBatchMessage.length;
// this._batchMessage will be used for final send operation
if (currentSize > this._maxSizeInBytes) {
this._encodedMessages.pop();
if (
!previouslyInstrumented &&
Boolean(eventData.properties && eventData.properties[TRACEPARENT_PROPERTY])
) {
const amqpMessage = toAmqpMessage(inputMessages[i]);
amqpMessage.body = this._context.namespace.dataTransformer.encode(inputMessages[i].body);
amqpMessages[i] = amqpMessage;
try {
encodedMessages[i] = RheaMessageUtil.encode(amqpMessage);
} catch (error) {
if (error instanceof TypeError || error.name === "TypeError") {
throw getMessagePropertyTypeMismatchError(inputMessages[i]) || error;
}
throw error;
}
}
// Convert every encoded message to amqp data section
const batchMessage: AmqpMessage = {
body: RheaMessageUtil.data_sections(encodedMessages)
};
// Set message_annotations, application_properties and properties of the first message as
// that of the envelope (batch message).
if (amqpMessages[0].message_annotations) {
batchMessage.message_annotations = amqpMessages[0].message_annotations;
}
if (amqpMessages[0].application_properties) {
batchMessage.application_properties = amqpMessages[0].application_properties;
}
for (const prop of messageProperties) {
if ((amqpMessages[0] as any)[prop]) {
(batchMessage as any)[prop] = (amqpMessages[0] as any)[prop];
}
}
// Finally encode the envelope (batch message).