Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
throwErrorIfConnectionClosed(this._context.namespace);
if (!options) options = {};
if (options.delayInSeconds == null) options.delayInSeconds = 1;
if (options.timeoutInSeconds == null) options.timeoutInSeconds = 5;
if (options.times == null) options.times = 5;
try {
const messageBody: any = {};
messageBody[Constants.sessionIdMapKey] = sessionId;
const request: AmqpMessage = {
body: messageBody,
reply_to: this.replyTo,
application_properties: {
operation: Constants.operations.renewSessionLock
}
};
request.application_properties![Constants.trackingId] = generate_uuid();
const associatedLinkName = this._getAssociatedReceiverName(this._context, sessionId);
if (associatedLinkName) {
request.application_properties![Constants.associatedLinkName] = associatedLinkName;
}
log.mgmt(
"[%s] Renew Session Lock request body: %O.",
this._context.namespace.connectionId,
request.body
);
log.mgmt(
"[%s] Acquiring lock to get the management req res link.",
this._context.namespace.connectionId
);
await defaultLock.acquire(this.managementLock, () => {
return this._init();
});
if (sessionId != null) {
messageBody[Constants.sessionIdMapKey] = sessionId;
}
const request: AmqpMessage = {
body: messageBody,
message_id: generate_uuid(),
reply_to: this.replyTo,
application_properties: {
operation: Constants.operations.receiveBySequenceNumber
}
};
const associatedLinkName = this._getAssociatedReceiverName(this._context, sessionId);
if (associatedLinkName) {
request.application_properties![Constants.associatedLinkName] = associatedLinkName;
}
request.application_properties![Constants.trackingId] = generate_uuid();
log.mgmt(
"[%s] Receive deferred messages request body: %O.",
this._context.namespace.connectionId,
request.body
);
log.mgmt(
"[%s] Acquiring lock to get the management req res link.",
this._context.namespace.connectionId
);
await defaultLock.acquire(this.managementLock, () => {
return this._init();
});
const result = await this._mgmtReqResLink!.sendRequest(request);
const messages = result.body.messages as {
message: Buffer;
this._context.namespace.connectionId,
"ruleName",
ruleName
);
try {
const request: AmqpMessage = {
body: {
"rule-name": types.wrap_string(ruleName)
},
reply_to: this.replyTo,
application_properties: {
operation: Constants.operations.removeRule
}
};
request.application_properties![Constants.trackingId] = generate_uuid();
log.mgmt(
"[%s] Remove Rule request body: %O.",
this._context.namespace.connectionId,
request.body
);
log.mgmt(
"[%s] Acquiring lock to get the management req res link.",
this._context.namespace.connectionId
);
await defaultLock.acquire(this.managementLock, () => {
return this._init();
});
await this._mgmtReqResLink!.sendRequest(request);
} catch (err) {
maxMessageCount = 1;
}
const messageList: ReceivedMessageInfo[] = [];
try {
const messageBody: any = {};
messageBody[Constants.fromSequenceNumber] = types.wrap_long(
Buffer.from(fromSequenceNumber.toBytesBE())
);
messageBody[Constants.messageCount] = types.wrap_int(maxMessageCount);
if (sessionId != undefined) {
messageBody[Constants.sessionIdMapKey] = sessionId;
}
const request: AmqpMessage = {
body: messageBody,
message_id: generate_uuid(),
reply_to: this.replyTo,
application_properties: {
operation: Constants.operations.peekMessage
}
};
const associatedLinkName = this._getAssociatedReceiverName(this._context, sessionId);
if (associatedLinkName) {
request.application_properties![Constants.associatedLinkName] = associatedLinkName;
}
request.application_properties![Constants.trackingId] = generate_uuid();
log.mgmt(
"[%s] Peek by sequence number request body: %O.",
this._context.namespace.connectionId,
request.body
);
log.mgmt(
async claimOwnership(partitionOwnership: PartitionOwnership[]): Promise {
const claimedOwnerships = [];
for (const ownership of partitionOwnership) {
if (
!this._partitionOwnershipMap.has(ownership.partitionId) ||
this._partitionOwnershipMap.get(ownership.partitionId)!.etag === ownership.etag
) {
var date = new Date();
const newOwnership = {
...ownership,
etag: generate_uuid(),
lastModifiedTimeInMs: date.getTime()
};
this._partitionOwnershipMap.set(newOwnership.partitionId, newOwnership);
claimedOwnerships.push(newOwnership);
}
}
return claimedOwnerships;
}
async updateCheckpoint(checkpoint: Checkpoint): Promise {
throwTypeErrorIfParameterMissing(
"",
"updateCheckpoint",
"sequenceNumber",
checkpoint.sequenceNumber
);
throwTypeErrorIfParameterMissing("", "updateCheckpoint", "offset", checkpoint.offset);
checkpoint = { ...checkpoint };
const partitionOwnership = this._partitionOwnershipMap.get(checkpoint.partitionId);
if (partitionOwnership) {
partitionOwnership.etag = generate_uuid();
const key = `${checkpoint.fullyQualifiedNamespace}:${checkpoint.eventHubName}:${checkpoint.consumerGroup}`;
let partitionMap = this._committedCheckpoints.get(key);
if (partitionMap == null) {
partitionMap = new Map();
this._committedCheckpoints.set(key, partitionMap);
}
partitionMap.set(checkpoint.partitionId, checkpoint);
}
}
i,
error
);
throw error;
}
}
try {
messageBody[Constants.sequenceNumbers] = types.wrap_array(
messageBody[Constants.sequenceNumbers],
0x81,
undefined
);
const request: AmqpMessage = {
body: messageBody,
message_id: generate_uuid(),
reply_to: this.replyTo,
application_properties: {
operation: Constants.operations.cancelScheduledMessage
}
};
if (this._context.sender) {
request.application_properties![Constants.associatedLinkName] = this._context.sender!.name;
}
request.application_properties![Constants.trackingId] = generate_uuid();
log.mgmt(
"[%s] Cancel scheduled messages request body: %O.",
this._context.namespace.connectionId,
request.body
);
log.mgmt(
constructor(topicName: string, context: ConnectionContext) {
throwErrorIfConnectionClosed(context);
this.entityPath = String(topicName);
this.id = `${this.entityPath}/${generate_uuid()}`;
this._context = ClientEntityContext.create(
this.entityPath,
ClientType.TopicClient,
context,
this.id
);
}