Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
var fs = require("fs");
var csv = require("fast-csv");
var stream1 = fs.createReadStream("files/testCsvFile.csv");
var {DataStream} = require("scramjet");
DataStream
// the following line will convert any stream to scramjet.DataStream
.from(csv.fromStream(stream2, { headers: true }))
// the next lines controls how many simultaneous operations are made
// I assumed 1, but if you're fine with 40 - go for it.
.setOptions({maxParallel: 1})
// the next line will call your async function and wait until it's completed
// and control the back-pressure of the stream
.do(async (data) => {
const query = await queryBuilder({
schema,
routine,
parameters,
request
}); //here we prepare query for calling the SP with parameters from data
winston.info(query + JSON.stringify(data));
async (measurements, {coords, station_id: stationId}) => {
const options = Object.assign(requestOptions, {
url: 'https://app.cpcbccr.com/caaqms/caaqms_viewdata_v2',
body: Buffer.from(`{"site_id":"${stationId}"}`).toString('base64'),
resolveWithFullResponse: true
});
try {
const response = await rp(options);
const {siteInfo, tableData: {bodyContent}} = JSON.parse(response.body);
await (
DataStream
.from(bodyContent)
.each(async p => {
let parameter = p.parameters.toLowerCase().replace('.', '');
parameter = (parameter === 'ozone') ? 'o3' : parameter;
// Make sure we want the pollutant
if (!acceptableParameters.includes(parameter)) {
return;
}
let m = {
averagingPeriod: {unit: 'hours', value: 0.25},
city: siteInfo.city,
location: siteInfo.siteName,
coordinates: coords,
attribution: [{
const request = require("request");
const rp = require("request-promise-native");
const { StringStream } = require("scramjet");
StringStream.from( // fetch your API to a scramjet stream
request("https://api.example.org/v1/shows/list")
)
.setOptions({maxParallel: 4}) // set your options
.lines() // split the stream by line
.parse(theirShow => { // parse strings to data
return {
id: theirShow.id,
title: theirShow.name,
url: theirShow.url
};
})
.map(async myShow => rp({ // use asynchronous mapping (for example send requests)
method: "POST",
simple: true,
uri: `http://api.local/set/${myShow.id}`,
body: JSON.stringify(myShow)
if (env.dryrun) {
log.info('--- Dry run for Testing, nothing is saved to the database. ---');
} else {
log.info('--- Full fetch started. ---');
}
const fetchReport = {
itemsInserted: 0,
timeStarted: Date.now(),
results: null,
errors: null,
timeEnded: NaN
};
// create a DataStream from sources
return DataStream.fromArray(Object.values(sources))
// flatten the sources
.flatten()
// set parallel limits
.setOptions({maxParallel: maxParallelAdapters})
// filter sources - if env is set then choose only matching source,
// otherwise filter out inactive sources.
// * inactive sources will be run if called by name in env.
.use(chooseSourcesBasedOnEnv, env, runningSources)
// mark sources as started
.do(markSourceAs('started', runningSources))
// get measurements object from given source
// all error handling should happen inside this call
.use(fetchCorrectedMeasurementsFromSourceStream, env)
// perform streamed save to DB and S3 on each source.
.use(streamMeasurementsToDBAndStorage, env)
// mark sources as finished
pollutants.map(pollutant => {
const url = source.url + source.country + '_' + pollutant + '.csv';
const timeLastInsert = moment().utc().subtract(2, 'hours');
let header;
return new StringStream()
.use(stream => {
const resp = request.get({url})
.on('response', ({statusCode}) => {
+statusCode !== 200
? stream.end()
: resp.pipe(stream);
});
return stream;
})
.CSVParse({header: false, delimiter: ',', skipEmptyLines: true})
.shift(1, columns => (header = columns[0]))
.filter(o => o.length === header.length)
.map(o => header.reduce((a, c, i) => { a[c] = o[i]; return a; }, {}))
// TODO: it would be good to provide the actual last fetch time so that we can filter already inserted items in a better way
.filter(o => moment(o.value_datetime_inserted).utc().isAfter(timeLastInsert))
// eslint-disable-next-line eqeqeq
export const getStream = function (cityName, url, averagingPeriod, source, orgUrl) {
const { metadata } = source;
const match = url.match(/[\w]{2}_([\w.]{2,})_([\d]{4})(?:_gg)?.txt/);
const parameter = match[1].toLowerCase().replace('.', '');
const year = match[2];
const unit = getUnit(parameter);
const dayPosition = averagingPeriod.value === 1 ? 0 : 1;
const fewDaysAgo = +Number(moment.tz(timezone).subtract(4, 'days').format('DDD'));
log.verbose(`Fetching data from ${url}`);
const stations = {};
return StringStream.from(
request(url)
)
.lines(StringStream.SPLIT_LINE)
.map(x => x.replace(/\s+/g, ' ').replace(/^\s+|\s+$/g, ''))
.parse(
row =>
row
.trim()
.split(/\s+/g)
)
.shift(1, ([header]) => {
header
.slice(2)
.forEach((x, i) => {
if (+x) {
stations[i] = Object.assign(metadata[x]);
export const getStream = function (cityName, url, averagingPeriod, source, orgUrl) {
const { metadata } = source;
const match = url.match(/[\w]{2}_([\w.]{2,})_([\d]{4})(?:_gg)?.txt/);
const parameter = match[1].toLowerCase().replace('.', '');
const year = match[2];
const unit = getUnit(parameter);
const dayPosition = averagingPeriod.value === 1 ? 0 : 1;
const fewDaysAgo = +Number(moment.tz(timezone).subtract(4, 'days').format('DDD'));
log.verbose(`Fetching data from ${url}`);
const stations = {};
return StringStream.from(
request(url)
)
.lines(StringStream.SPLIT_LINE)
.map(x => x.replace(/\s+/g, ' ').replace(/^\s+|\s+$/g, ''))
.parse(
row =>
row
.trim()
.split(/\s+/g)
)
.shift(1, ([header]) => {
header
.slice(2)
.forEach((x, i) => {
if (+x) {
stations[i] = Object.assign(metadata[x]);
}
});
})
const {StringStream} = require('scramjet');
const wordcount = require('wordcount');
const fetch = require('node-fetch');
const htmlToText = require('html-to-text');
const {promisify} = require('util');
StringStream.fromArray(["https://stackoverflow.com/", "https://caolan.github.io/async/docs.html#eachLimit"])
.setOptions({maxParallel: 4})
.parse(async url => ({
url,
response: await fetch(url)
}))
.map(async ({url, response}) => {
const html = await response.text();
const text = htmlToText.fromString(html);
const count = wordcount(text);
return {
url,
count
};
})
.each(console.log)
export function fetchStream (source) {
const out = new DataStream();
out.name = 'unused';
log.debug('Fetch stream called');
fetchMetadata(source)
.then((stations) => fetchPollutants(source, stations))
.then(stream => stream.pipe(out))
;
return out;
}
function streamRecordsToPg (stream, pg) {
const st = require('knex-postgis')(pg);
const table = pg('measurements')
.returning('location');
return stream
.tap()
.pipe(new DataStream())
.setOptions({maxParallel: 1})
.assign(async measurement => {
const record = convertMeasurementToSQLObject(measurement, st, pg);
try {
await table.insert(record);
} catch (cause) {
if (cause.code === '23505') {
return { status: 'duplicate' };
}
throw cause;
}
return { status: 'inserted' };
});
}