Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async function ingestGranule(queue) {
const inputPayloadJson = fs.readFileSync(inputPayloadFilename, 'utf8');
// update test data filepaths
const inputPayload = await setupTestGranuleForIngest(config.bucket, inputPayloadJson, granuleRegex, testSuffix, testDataFolder);
const granuleId = inputPayload.granules[0].granuleId;
await sqs().sendMessage({ QueueUrl: queue, MessageBody: JSON.stringify(inputPayload) }).promise();
return granuleId;
}
async validateAndUpdateSqsRule(rule) {
const queueUrl = rule.rule.value;
if (!(await aws.sqsQueueExists(queueUrl))) {
throw new Error(`SQS queue ${queueUrl} does not exist or your account does not have permissions to access it`);
}
const qAttrParams = {
QueueUrl: queueUrl,
AttributeNames: ['All']
};
const attributes = await aws.sqs().getQueueAttributes(qAttrParams).promise();
if (!attributes.Attributes.RedrivePolicy) {
throw new Error(`SQS queue ${rule} does not have a dead-letter queue configured`);
}
// update rule meta
if (!get(rule, 'meta.visibilityTimeout')) {
set(rule, 'meta.visibilityTimeout', parseInt(attributes.Attributes.VisibilityTimeout, 10));
}
if (!get(rule, 'meta.retries')) set(rule, 'meta.retries', 3);
return rule;
}
const {
queueUrl,
receiptHandle
} = eventMessage.meta.eventSource;
if (isFailedSfStatus(eventStatus)) {
// update visibilityTimeout to 5s so the message can be retried
log.debug(`update message ${receiptHandle} queue ${queueUrl} visibilityTimeout to 5s`);
const params = {
QueueUrl: queueUrl,
ReceiptHandle: receiptHandle,
VisibilityTimeout: 5
};
await sqs().changeMessageVisibility(params).promise();
} else {
// delete SQS message from the source queue when the workflow succeeded
log.debug(`remove message ${receiptHandle} from queue ${queueUrl}`);
await deleteSQSMessage(queueUrl, receiptHandle);
}
return Promise.resolve();
}
beforeAll(async () => {
await sqs().purgeQueue({
QueueUrl: maxQueueUrl
});
await new Promise((res) => {
// Wait 10 seconds to allow running executions to finish.
setTimeout(res, 10000);
});
});
const fetchMessages = async(queueUrl, count) => {
const maxNumberOfMessages = Math.min(count, 10);
const data = await aws.sqs().receiveMessage({
QueueUrl: queueUrl,
MaxNumberOfMessages: maxNumberOfMessages
}).promise();
const messages = data.Messages || [];
// eslint-disable-next-line max-len
return messages;
};
function dispatch(message) {
const queueUrl = this.rule.rule.value;
const messageReceiveCount = parseInt(message.Attributes.ApproximateReceiveCount, 10);
if (get(this.rule, 'meta.retries', 3) < messageReceiveCount - 1) {
log.debug(`message ${message.MessageId} from queue ${queueUrl} has been processed ${messageReceiveCount - 1} times, no more retries`);
// update visibilityTimeout to 5s
const params = {
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 5
};
return sqs().changeMessageVisibility(params).promise();
}
if (messageReceiveCount !== 1) {
log.debug(`message ${message.MessageId} from queue ${queueUrl} is being processed ${messageReceiveCount} times`);
}
const eventObject = Object.assign({}, message.Body);
const eventSource = {
type: 'sqs',
messageId: message.MessageId,
queueUrl,
receiptHandle: message.ReceiptHandle,
receivedCount: messageReceiveCount,
deleteCompletedMessage: true,
workflow_name: this.rule.workflow
};
async function cleanUp() {
console.log(`\nDeleting rule ${ruleOverride.name}`);
const rules = await rulesList(config.stackName, config.bucket, ruleDirectory);
await deleteRules(config.stackName, config.bucket, rules, ruleSuffix);
await Promise.all([
deleteFolder(config.bucket, testDataFolder),
cleanupCollections(config.stackName, config.bucket, collectionsDir, testSuffix),
cleanupProviders(config.stackName, config.bucket, providersDir, testSuffix),
sqs().deleteQueue({ QueueUrl: queues.queueUrl }).promise(),
sqs().deleteQueue({ QueueUrl: queues.deadLetterQueueUrl }).promise()
]);
}
() =>
sqs().sendMessage({ QueueUrl: queueUrl, MessageBody: JSON.stringify(message) }).promise()
);
beforeAll(async () => {
semaphoreDownLambda = `${config.stackName}-sfSemaphoreDown`;
maxQueueName = `${testName}MaxQueue`;
const { QueueUrl } = await sqs().createQueue({
QueueName: maxQueueName
}).promise();
maxQueueUrl = QueueUrl;
const { stateMachineArn } = await sfn().createStateMachine(waitPassSfParams).promise();
waitPassSfArn = stateMachineArn;
ruleName = timestampedName('waitPassSfRule');
ruleTargetId = timestampedName('waitPassSfRuleTarget');
rulePermissionId = timestampedName('waitPassSfRulePermission');
await createCloudwatchRuleWithTarget({
stateMachineArn: waitPassSfArn,
functionName: semaphoreDownLambda,
ruleName,
ruleTargetId,
afterAll(async () => {
await deleteCloudwatchRuleWithTargets({
functionName: semaphoreDownLambda,
ruleName,
rulePermissionId,
ruleTargetId
});
await Promise.all([
sqs().deleteQueue({
QueueUrl: maxQueueUrl
}).promise(),
sfn().deleteStateMachine({ stateMachineArn: waitPassSfArn }).promise(),
dynamodbDocClient().delete({
TableName: `${config.stackName}-SemaphoresTable`,
Key: {
key: maxQueueName
}
}).promise()
]);
});