Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const S3UploadPromises = pdrList.map(async (pdrEntry) => {
const fileName = pdrEntry.name;
log.info(`FILE: ${fileName}`);
// Get the file contents
const pdr = await pdrMod.getPdr(client, folder, fileName);
log.debug('SUCCESSFULLY RETRIEVED PDR FROM SIPS SERVER');
// Write the contents out to S3
const s3PdrKey = `${keyPrefix}/${fileName}`;
const pdrStream = stringToStream(pdr.pdr);
await aws.uploadS3FileStream(pdrStream, bucket, s3PdrKey);
log.debug(`PDR stored at [${s3PdrKey} in S3 bucket [${bucket}]`);
return {
pdr_file_name: fileName,
s3_bucket: bucket,
s3_key: s3PdrKey
};
});
async function updateSqsQueue(event) {
if (!isSqsQueueUpdateNeeded(event)) return Promise.resolve('Not a valid event for updating SQS queue');
const eventStatus = getSfEventStatus(event);
const eventMessage = getSfEventMessageObject(event, 'input', '{}');
const {
queueUrl,
receiptHandle
} = eventMessage.meta.eventSource;
if (isFailedSfStatus(eventStatus)) {
// update visibilityTimeout to 5s so the message can be retried
log.debug(`update message ${receiptHandle} queue ${queueUrl} visibilityTimeout to 5s`);
const params = {
QueueUrl: queueUrl,
ReceiptHandle: receiptHandle,
VisibilityTimeout: 5
};
await sqs().changeMessageVisibility(params).promise();
} else {
// delete SQS message from the source queue when the workflow succeeded
log.debug(`remove message ${receiptHandle} from queue ${queueUrl}`);
await deleteSQSMessage(queueUrl, receiptHandle);
}
return Promise.resolve();
}
async function proceed(bucket, provider, filename, counter = 0) {
// try to proceed for 270 seconds
if (counter > 270) {
return false;
}
const globalConnectionLimit = provider.globalConnectionLimit;
const count = await countLock(bucket, provider.id);
if (count >= globalConnectionLimit) {
log.debug({ provider: provider.id }, 'Reached the connection limit, trying again');
// wait for 5 second and try again
await delay(5000);
return proceed(bucket, provider, filename, counter + 1);
}
// add the lock
await addLock(bucket, provider.id, filename);
return true;
}
const granuleId = this.config.granule_meta.granuleId;
const version = this.config.granule_meta.version;
const payload = message.payload;
// Delete the tracking data for the given granule from DynamoDB
// This should be abstracted out to allow other key stores to be used. See
// JIRA issue GITC-557
const params = {
TableName: tableName,
Key: {
'granule-id': granuleId,
version: version
}
};
log.debug(`Deleting ingest tracking for [${granuleId}]`);
await docClient.delete(params).promise();
return payload;
}
// FIXME Can config.folder not be used?
log.info('Received the provider', { provider: get(provider, 'id') });
const Discover = pdr.selector('discover', provider.protocol);
const discover = new Discover(
stack,
bucket,
providerPath,
provider,
config.useList,
'pdrs',
config.force || false
);
log.debug('Starting PDR discovery');
return discover.discover()
.then((pdrs) => {
if (discover.connected) discover.end();
// filter pdrs using filterPDrs
if (filterPdrs && pdrs.length > 0) {
log.info(`Filtering ${pdrs.length} with ${filterPdrs}`);
const fpdrs = pdrs.filter((p) => p.name.match(filterPdrs));
return { pdrs: fpdrs };
}
return { pdrs };
})
.catch((e) => {
log.error(e);
function handler(event, context, cb) {
// we can handle both incoming message from SNS as well as direct payload
log.debug(JSON.stringify(event));
const records = get(event, 'Records');
let jobs = [];
if (records) {
jobs = records.map(handlePayload);
}
else {
jobs.push(handlePayload(event));
}
return Promise.all(jobs)
.then((r) => {
log.info(`Updated ${r.length} es records`);
cb(null, r);
return r;
})
}).catch((e) => {
log.debug('SyncGranule errored.');
if (ingest.end) ingest.end();
let errorToThrow = e;
if (e.toString().includes('ECONNREFUSED')) {
errorToThrow = new errors.RemoteResourceError('Connection Refused');
} else if (e.details && e.details.status === 'timeout') {
errorToThrow = new errors.ConnectionTimeout('connection Timed out');
}
log.error(errorToThrow);
throw errorToThrow;
});
};
function handler(event, context) {
log.debug(event);
handle(event, context, true, (cb) => {
if (event.httpMethod === 'GET' && event.resource.includes('/stats/histogram')) {
histogram(event, cb);
}
else if (event.httpMethod === 'GET' && event.resource.includes('/stats/aggregate')) {
count(event, cb);
}
else if (event.httpMethod === 'GET' && event.resource.includes('/stats/average')) {
average(event, cb);
}
else {
summary(event, cb);
}
});
}
zlib.gunzip(payload, (e, r) => {
try {
const logs = JSON.parse(r.toString());
log.debug(logs);
return indexLog(undefined, logs.logEvents)
.then((s) => cb(null, s))
.catch(cb);
}
catch (err) {
log.error(e);
return cb(null);
}
});
}
async function download(ingest, bucket, provider, granules) {
log.debug(`awaiting lock.proceed in download() bucket: ${bucket}, `
+ `provider: ${JSON.stringify(provider)}, granuleID: ${granules[0].granuleId}`);
const proceed = await lock.proceed(bucket, provider, granules[0].granuleId);
if (!proceed) {
const err = new errors.ResourcesLockedError('Download lock remained in place after multiple tries');
log.error(err);
throw err;
}
const ingestGranule = async (granule) => {
try {
const startTime = Date.now();
const r = await ingest.ingest(granule, bucket);
const endTime = Date.now();
return {