How to use the scramjet.DataStream.fromArray function in scramjet

To help you get started, we’ve selected a few scramjet examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github openaq / openaq-fetch / fetch.js View on Github external
if (env.dryrun) {
      log.info('--- Dry run for Testing, nothing is saved to the database. ---');
    } else {
      log.info('--- Full fetch started. ---');
    }

    const fetchReport = {
      itemsInserted: 0,
      timeStarted: Date.now(),
      results: null,
      errors: null,
      timeEnded: NaN
    };

    // create a DataStream from sources
    return DataStream.fromArray(Object.values(sources))
      // flatten the sources
      .flatten()
      // set parallel limits
      .setOptions({maxParallel: maxParallelAdapters})
      // filter sources - if env is set then choose only matching source,
      //   otherwise filter out inactive sources.
      // * inactive sources will be run if called by name in env.
      .use(chooseSourcesBasedOnEnv, env, runningSources)
      // mark sources as started
      .do(markSourceAs('started', runningSources))
      // get measurements object from given source
      // all error handling should happen inside this call
      .use(fetchCorrectedMeasurementsFromSourceStream, env)
      // perform streamed save to DB and S3 on each source.
      .use(streamMeasurementsToDBAndStorage, env)
      // mark sources as finished

scramjet

Lightweight and real-time data functional stream programming framework like event-stream, written in ES6 using async await with multi-threading and typescript support

MIT
Latest version published 1 year ago

Package Health Score

55 / 100
Full package analysis