Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if (env.dryrun) {
log.info('--- Dry run for Testing, nothing is saved to the database. ---');
} else {
log.info('--- Full fetch started. ---');
}
const fetchReport = {
itemsInserted: 0,
timeStarted: Date.now(),
results: null,
errors: null,
timeEnded: NaN
};
// create a DataStream from sources
return DataStream.fromArray(Object.values(sources))
// flatten the sources
.flatten()
// set parallel limits
.setOptions({maxParallel: maxParallelAdapters})
// filter sources - if env is set then choose only matching source,
// otherwise filter out inactive sources.
// * inactive sources will be run if called by name in env.
.use(chooseSourcesBasedOnEnv, env, runningSources)
// mark sources as started
.do(markSourceAs('started', runningSources))
// get measurements object from given source
// all error handling should happen inside this call
.use(fetchCorrectedMeasurementsFromSourceStream, env)
// perform streamed save to DB and S3 on each source.
.use(streamMeasurementsToDBAndStorage, env)
// mark sources as finished