Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pollutants.map(pollutant => {
const url = source.url + source.country + '_' + pollutant + '.csv';
const timeLastInsert = moment().utc().subtract(2, 'hours');
let header;
return new StringStream()
.use(stream => {
const resp = request.get({url})
.on('response', ({statusCode}) => {
+statusCode !== 200
? stream.end()
: resp.pipe(stream);
});
return stream;
})
.CSVParse({header: false, delimiter: ',', skipEmptyLines: true})
.shift(1, columns => (header = columns[0]))
.filter(o => o.length === header.length)
.map(o => header.reduce((a, c, i) => { a[c] = o[i]; return a; }, {}))
// TODO: it would be good to provide the actual last fetch time so that we can filter already inserted items in a better way
.filter(o => moment(o.value_datetime_inserted).utc().isAfter(timeLastInsert))
// eslint-disable-next-line eqeqeq