Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
function handleProcessRecordError(error, record, fromSNS, isKinesisRetry) {
if (!isKinesisRetry) {
if (fromSNS) {
log.error('Failed SNS message:');
log.error(JSON.stringify(record));
throw error;
}
return publishRecordToFallbackTopic(record)
.then((res) => {
log.debug('sns result:', res);
return res;
})
.catch((snsError) => {
// We couldn't publish the record to the fallback Topic, so we will log
// and throw the original error. Kinesis polling will pick up this
// record again and retry.
log.error(`Failed to publish record to fallback topic: ${record}`);
log.error(`original error: ${error}`);
log.error(`subsequent error: ${snsError}`);
throw error;
});
function handleProcessRecordError(error, record, fromSNS, isKinesisRetry) {
if (!isKinesisRetry) {
if (fromSNS) {
log.error('Failed SNS message:');
log.error(JSON.stringify(record));
throw error;
}
return publishRecordToFallbackTopic(record)
.then((res) => {
log.debug('sns result:', res);
return res;
})
.catch((snsError) => {
// We couldn't publish the record to the fallback Topic, so we will log
// and throw the original error. Kinesis polling will pick up this
// record again and retry.
log.error(`Failed to publish record to fallback topic: ${record}`);
log.error(`original error: ${error}`);
log.error(`subsequent error: ${snsError}`);
throw error;
let dataBlob;
if (fromSNS) {
// Kinesis fallback SNS notification
isKinesisRetry = true;
dataBlob = parsed.kinesis.data;
} else {
dataBlob = record.kinesis.data;
}
try {
validationSchema = kinesisSchema;
originalMessageSource = 'kinesis';
const dataString = Buffer.from(dataBlob, 'base64').toString();
eventObject = JSON.parse(dataString);
ruleParam = eventObject.collection;
} catch (err) {
log.error('Caught error parsing JSON:');
log.error(err);
// TODO (out of scope): does it make sense to attempt retrying bad JSON?
return handleProcessRecordError(err, record, isKinesisRetry, fromSNS);
}
}
return validateMessage(eventObject, originalMessageSource, validationSchema)
.then(() => getRules(ruleParam, originalMessageSource))
.then((rules) => (
Promise.all(rules.map((rule) => {
if (originalMessageSource === 'sns') set(rule, 'meta.snsSourceArn', ruleParam);
return queueMessageForRule(rule, eventObject);
}))))
.catch((err) => {
log.error('Caught error in processRecord:');
log.error(err);
const parsedUrl = parseS3Uri(file.filename);
Bucket = parsedUrl.Bucket;
Key = parsedUrl.Key;
} else {
throw new Error(`Unable to determine file location: ${JSON.stringify(file)}`);
}
try {
const headObjectResponse = await headObject(Bucket, Key);
return {
filename: file.filename,
size: headObjectResponse.ContentLength,
LastModified: headObjectResponse.LastModified
};
} catch (err) {
log.error(`Failed to headObject the object at ${Bucket}/${Key} in s3.`);
throw (err);
}
}
.catch((err) => {
log.error('Caught error in processRecord:');
log.error(err);
return handleProcessRecordError(err, record, isKinesisRetry, fromSNS);
});
}
client.ls(path, (err, data) => {
client.destroy();
if (err) {
if (err.message.includes('Timed out') && counter < 3) {
log.error(`Connection timed out while listing ${path}. Retrying...`);
counter += 1;
return this._list(path, counter).then((r) => {
log.info(`${counter} retry suceeded`);
return resolve(r);
}).catch((e) => reject(e));
}
return reject(err);
}
return resolve(data.map((d) => ({
name: d.name,
path: path,
size: parseInt(d.size, 10),
time: d.time,
type: d.type
})));
.catch((error) => {
log.error(`Failed to copy s3://${CopySource} to s3://${target.Bucket}/${target.Key}: ${error.message}`);
throw error;
});
}
async function publish(cmrFile, creds, bucket, stack) {
let password;
try {
password = await DefaultProvider.decrypt(creds.password, undefined, bucket, stack);
}
catch (error) {
log.error('Decrypting password failed, using unencrypted password', error);
password = creds.password;
}
const cmr = new CMR(
creds.provider,
creds.clientId,
creds.username,
password
);
const xml = cmrFile.metadata;
const res = await cmr.ingestGranule(xml);
const conceptId = res.result['concept-id'];
log.info(`Published ${cmrFile.granuleId} to the CMR. conceptId: ${conceptId}`);
return {
.catch((err) => {
log.error('Caught error in processNotification:');
log.error(err);
throw err;
});
}