How to use the scramjet.StringStream function in scramjet

To help you get started, we’ve selected a few scramjet examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github openaq / openaq-fetch / adapters / eea-direct.js View on Github external
pollutants.map(pollutant => {
      const url = source.url + source.country + '_' + pollutant + '.csv';
      const timeLastInsert = moment().utc().subtract(2, 'hours');
      let header;

      return new StringStream()
        .use(stream => {
          const resp = request.get({url})
            .on('response', ({statusCode}) => {
              +statusCode !== 200
                ? stream.end()
                : resp.pipe(stream);
            });
          return stream;
        })
        .CSVParse({header: false, delimiter: ',', skipEmptyLines: true})
        .shift(1, columns => (header = columns[0]))
        .filter(o => o.length === header.length)
        .map(o => header.reduce((a, c, i) => { a[c] = o[i]; return a; }, {}))
        // TODO: it would be good to provide the actual last fetch time so that we can filter already inserted items in a better way
        .filter(o => moment(o.value_datetime_inserted).utc().isAfter(timeLastInsert))
        // eslint-disable-next-line eqeqeq

scramjet

Lightweight and real-time data functional stream programming framework like event-stream, written in ES6 using async await with multi-threading and typescript support

MIT
Latest version published 1 year ago

Package Health Score

55 / 100
Full package analysis