Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
}
const amqpMessage = toAmqpMessage(data);
amqpMessage.body = this._context.namespace.dataTransformer.encode(data.body);
let encodedMessage;
try {
encodedMessage = RheaMessageUtil.encode(amqpMessage);
} catch (error) {
if (error instanceof TypeError || error.name === "TypeError") {
// `RheaMessageUtil.encode` can fail if message properties are of invalid type
// rhea throws errors with name `TypeError` but not an instance of `TypeError`, so catch them too
// Errors in such cases do not have user friendy message or call stack
// So use `getMessagePropertyTypeMismatchError` to get a better error message
throw getMessagePropertyTypeMismatchError(data) || error;
}
throw error;
}
log.sender(
"[%s] Sender '%s', trying to send message: %O",
this._context.namespace.connectionId,
this.name,
data
);
if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
}
const amqpMessage = toAmqpMessage(data);
amqpMessage.body = this._context.namespace.dataTransformer.encode(data.body);
let encodedMessage;
try {
encodedMessage = RheaMessageUtil.encode(amqpMessage);
} catch (error) {
if (error instanceof TypeError || error.name === "TypeError") {
// `RheaMessageUtil.encode` can fail if message properties are of invalid type
// rhea throws errors with name `TypeError` but not an instance of `TypeError`, so catch them too
// Errors in such cases do not have user friendy message or call stack
// So use `getMessagePropertyTypeMismatchError` to get a better error message
error = getMessagePropertyTypeMismatchError(data) || error;
}
throw error;
}
log.sender(
"[%s] Sender '%s', trying to send message: %O",
this._context.namespace.connectionId,
this.name,
data
);
// Set message_annotations, application_properties and properties of the first message as
// that of the envelope (batch message).
if (amqpMessages[0].message_annotations) {
batchMessage.message_annotations = amqpMessages[0].message_annotations;
}
if (amqpMessages[0].application_properties) {
batchMessage.application_properties = amqpMessages[0].application_properties;
}
for (const prop of messageProperties) {
if ((amqpMessages[0] as any)[prop]) {
(batchMessage as any)[prop] = (amqpMessages[0] as any)[prop];
}
}
// Finally encode the envelope (batch message).
const encodedBatchMessage = RheaMessageUtil.encode(batchMessage);
log.sender(
"[%s]Sender '%s', sending encoded batch message.",
this._context.namespace.connectionId,
this.name,
encodedBatchMessage
);
return await this._trySend(encodedBatchMessage, true);
} catch (err) {
log.error(
"[%s] Sender '%s': An error occurred while sending the messages: %O\nError: %O",
this._context.namespace.connectionId,
this.name,
inputMessages,
err
);
throw err;
// Convert EventData to AmqpMessage.
const amqpMessage = toAmqpMessage(eventData, this._partitionKey);
amqpMessage.body = this._context.dataTransformer.encode(eventData.body);
// Encode every amqp message and then convert every encoded message to amqp data section
this._encodedMessages.push(message.encode(amqpMessage));
const batchMessage: AmqpMessage = {
body: message.data_sections(this._encodedMessages)
};
if (amqpMessage.message_annotations) {
batchMessage.message_annotations = amqpMessage.message_annotations;
}
const encodedBatchMessage = message.encode(batchMessage);
const currentSize = encodedBatchMessage.length;
// this._batchMessage will be used for final send operation
if (currentSize > this._maxSizeInBytes) {
this._encodedMessages.pop();
if (
!previouslyInstrumented &&
Boolean(eventData.properties && eventData.properties[TRACEPARENT_PROPERTY])
) {
this._spanContexts.pop();
}
return false;
}
this._batchMessage = encodedBatchMessage;
this._sizeInBytes = currentSize;
this._count++;
}
log.sender(
"[%s] Sender '%s', trying to send Message[]: %O",
this._context.namespace.connectionId,
this.name,
inputMessages
);
const amqpMessages: AmqpMessage[] = [];
const encodedMessages = [];
// Convert Message to AmqpMessage.
for (let i = 0; i < inputMessages.length; i++) {
const amqpMessage = toAmqpMessage(inputMessages[i]);
amqpMessage.body = this._context.namespace.dataTransformer.encode(inputMessages[i].body);
amqpMessages[i] = amqpMessage;
try {
encodedMessages[i] = RheaMessageUtil.encode(amqpMessage);
} catch (error) {
if (error instanceof TypeError || error.name === "TypeError") {
// `RheaMessageUtil.encode` can fail if message properties are of invalid type
// rhea throws errors with name `TypeError` but not an instance of `TypeError`, so catch them too
// Errors in such cases do not have user friendy message or call stack
// So use `getMessagePropertyTypeMismatchError` to get a better error message
error = getMessagePropertyTypeMismatchError(inputMessages[i]) || error;
}
throw error;
}
}
// Convert every encoded message to amqp data section
const batchMessage: AmqpMessage = {
body: RheaMessageUtil.data_sections(encodedMessages)
};
// check if the event has already been instrumented
const previouslyInstrumented = Boolean(
eventData.properties && eventData.properties[TRACEPARENT_PROPERTY]
);
if (!previouslyInstrumented) {
const messageSpan = createMessageSpan(options.parentSpan);
eventData = instrumentEventData(eventData, messageSpan);
this._spanContexts.push(messageSpan.context());
messageSpan.end();
}
// Convert EventData to AmqpMessage.
const amqpMessage = toAmqpMessage(eventData, this._partitionKey);
amqpMessage.body = this._context.dataTransformer.encode(eventData.body);
// Encode every amqp message and then convert every encoded message to amqp data section
this._encodedMessages.push(message.encode(amqpMessage));
const batchMessage: AmqpMessage = {
body: message.data_sections(this._encodedMessages)
};
if (amqpMessage.message_annotations) {
batchMessage.message_annotations = amqpMessage.message_annotations;
}
const encodedBatchMessage = message.encode(batchMessage);
const currentSize = encodedBatchMessage.length;
// this._batchMessage will be used for final send operation
if (currentSize > this._maxSizeInBytes) {
this._encodedMessages.pop();
if (
message.body = this._context.dataTransformer.encode(events[i].body);
messages[i] = message;
}
// Encode every amqp message and then convert every encoded message to amqp data section
const batchMessage: AmqpMessage = {
body: message.data_sections(messages.map(message.encode))
};
// Set message_annotations of the first message as
// that of the envelope (batch message).
if (messages[0].message_annotations) {
batchMessage.message_annotations = messages[0].message_annotations;
}
// Finally encode the envelope (batch message).
encodedBatchMessage = message.encode(batchMessage);
}
logger.info(
"[%s] Sender '%s', sending encoded batch message.",
this._context.connectionId,
this.name,
encodedBatchMessage
);
return await this._trySendBatch(encodedBatchMessage, options);
} catch (err) {
logger.warning("An error occurred while sending the batch message %O", err);
logErrorStackTrace(err);
throw err;
}
}
}
log.sender(
"[%s] Sender '%s', trying to send Message[]: %O",
this._context.namespace.connectionId,
this.name,
inputMessages
);
const amqpMessages: AmqpMessage[] = [];
const encodedMessages = [];
// Convert Message to AmqpMessage.
for (let i = 0; i < inputMessages.length; i++) {
const amqpMessage = toAmqpMessage(inputMessages[i]);
amqpMessage.body = this._context.namespace.dataTransformer.encode(inputMessages[i].body);
amqpMessages[i] = amqpMessage;
try {
encodedMessages[i] = RheaMessageUtil.encode(amqpMessage);
} catch (error) {
if (error instanceof TypeError || error.name === "TypeError") {
throw getMessagePropertyTypeMismatchError(inputMessages[i]) || error;
}
throw error;
}
}
// Convert every encoded message to amqp data section
const batchMessage: AmqpMessage = {
body: RheaMessageUtil.data_sections(encodedMessages)
};
// Set message_annotations, application_properties and properties of the first message as
// that of the envelope (batch message).
if (amqpMessages[0].message_annotations) {
batchMessage.message_annotations = amqpMessages[0].message_annotations;
// Set message_annotations, application_properties and properties of the first message as
// that of the envelope (batch message).
if (amqpMessages[0].message_annotations) {
batchMessage.message_annotations = amqpMessages[0].message_annotations;
}
if (amqpMessages[0].application_properties) {
batchMessage.application_properties = amqpMessages[0].application_properties;
}
for (const prop of messageProperties) {
if ((amqpMessages[0] as any)[prop]) {
(batchMessage as any)[prop] = (amqpMessages[0] as any)[prop];
}
}
// Finally encode the envelope (batch message).
const encodedBatchMessage = RheaMessageUtil.encode(batchMessage);
log.sender(
"[%s]Sender '%s', sending encoded batch message.",
this._context.namespace.connectionId,
this.name,
encodedBatchMessage
);
return await this._trySend(encodedBatchMessage, true);
} catch (err) {
log.error(
"[%s] Sender '%s': An error occurred while sending the messages: %O\nError: %O",
this._context.namespace.connectionId,
this.name,
inputMessages,
err
);
throw err;
async scheduleMessages(
scheduledEnqueueTimeUtc: Date,
messages: SendableMessageInfo[]
): Promise {
throwErrorIfConnectionClosed(this._context.namespace);
const messageBody: any[] = [];
for (let i = 0; i < messages.length; i++) {
const item = messages[i];
if (!item.messageId) item.messageId = generate_uuid();
item.scheduledEnqueueTimeUtc = scheduledEnqueueTimeUtc;
const amqpMessage = toAmqpMessage(item);
try {
const entry: any = {
message: RheaMessageUtil.encode(amqpMessage),
"message-id": item.messageId
};
if (item.sessionId) {
entry[Constants.sessionIdMapKey] = item.sessionId;
}
if (item.partitionKey) {
entry["partition-key"] = item.partitionKey;
}
if (item.viaPartitionKey) {
entry["via-partition-key"] = item.viaPartitionKey;
}
const wrappedEntry = types.wrap_map(entry);
messageBody.push(wrappedEntry);
} catch (err) {
let error: Error;