Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async list() {
const listFn = this.sftpClient.list.bind(this.sftpClient);
const files = await recursion(listFn, this.path);
log.info({ host: this.host }, `${files.length} files were found on ${this.host}`);
// Type 'type' field is required to support recursive file listing, but
// should not be part of the returned result.
return files.map((file) => omit(file, 'type'));
}
.map((s3Object) => ({ Bucket: logsBucket, Key: s3Object.Key }));
log.info(`Found ${s3Objects.length} log files in S3`);
// Fetch all distribution events from S3
const allDistributionEvents = flatten(await pMap(
s3Objects,
getDistributionEventsFromS3Object,
{ concurrency: 5 }
));
log.info(`Found a total of ${allDistributionEvents.length} distribution events`);
const distributionEventsInReportPeriod = allDistributionEvents.filter(eventTimeFilter);
log.info(`Found ${allDistributionEvents.length} distribution events between `
+ `${reportStartTime.format()} and ${reportEndTime.format()}`);
return (await Promise.all(distributionEventsInReportPeriod
.sort(sortByTime)
.map((event) => event.toString())))
.join('\n');
}
async function generateAndStoreDistributionReport(params) {
const {
reportStartTime,
reportEndTime
} = params;
const distributionReport = await generateDistributionReport({
reportStartTime,
reportEndTime
});
const { reportsBucket, reportsPrefix } = bucketsPrefixes();
const reportKey = await determineReportKey(DISTRIBUTION_REPORT, reportStartTime, reportsPrefix);
const s3Uri = aws.buildS3Uri(reportsBucket, reportKey);
log.info(`Uploading report to ${s3Uri}`);
return aws.s3().putObject({
Bucket: reportsBucket,
Key: reportKey,
Body: distributionReport
}).promise()
.then(() => ({ reportType: DISTRIBUTION_REPORT, file: s3Uri }));
}
// Export to support testing
async function generateReport(startTime, endTime) {
log.debug(`ems-metadata-report.generateReport startTime: ${startTime} endTime: ${endTime}`);
const reportType = 'metadata';
const emsCollections = await getCollectionsForEms(startTime, endTime);
const report = emsCollections
.map((collection) => Object.values(collection.emsRecord).join('|&|'))
.join('\n');
const { reportsBucket, reportsPrefix } = bucketsPrefixes();
const reportKey = await determineReportKey(reportType, startTime, reportsPrefix);
const s3Uri = buildS3Uri(reportsBucket, reportKey);
log.info(`Uploading report to ${s3Uri}`);
return s3().putObject({
Bucket: reportsBucket,
Key: reportKey,
Body: report
}).promise()
.then(() => ({ reportType, file: s3Uri }));
}
async function uploadReportToS3(filename, reportBucket, reportKey) {
await aws.s3().putObject({
Bucket: reportBucket,
Key: reportKey,
Body: fs.createReadStream(filename)
}).promise();
fs.unlinkSync(filename);
const s3Uri = aws.buildS3Uri(reportBucket, reportKey);
log.info(`uploaded ${s3Uri}`);
return s3Uri;
}
async function publishRecordToFallbackTopic(record) {
const fallbackArn = process.env.FallbackTopicArn;
log.info('publishing bad kinesis record to Topic:', fallbackArn);
log.info('record:', JSON.stringify(record));
return sns().publish({
TopicArn: fallbackArn,
Message: JSON.stringify(record)
}).promise();
}
async function publishUMMGJSON2CMR(cmrPublishObject, cmrClient) {
const granuleId = cmrPublishObject.metadataObject.GranuleUR;
const res = await cmrClient.ingestUMMGranule(cmrPublishObject.metadataObject);
const conceptId = res['concept-id'];
log.info(`Published UMMG ${granuleId} to the CMR. conceptId: ${conceptId}`);
return {
granuleId,
conceptId,
link: `${getUrl('search', null, process.env.CMR_ENVIRONMENT)}granules.json?concept_id=${conceptId}`,
metadataFormat: ummVersionToMetadataFormat(ummVersion(cmrPublishObject.metadataObject))
};
}
async function publishRecordToFallbackTopic(record) {
const fallbackArn = process.env.FallbackTopicArn;
log.info('publishing bad kinesis record to Topic:', fallbackArn);
log.info('record:', JSON.stringify(record));
return sns().publish({
TopicArn: fallbackArn,
Message: JSON.stringify(record)
}).promise();
}