Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
export function fetchStream (source) {
const out = new DataStream();
out.name = 'unused';
log.debug('Fetch stream called');
fetchMetadata(source)
.then((stations) => fetchPollutants(source, stations))
.then(stream => stream.pipe(out))
;
return out;
}
function streamRecordsToPg (stream, pg) {
const st = require('knex-postgis')(pg);
const table = pg('measurements')
.returning('location');
return stream
.tap()
.pipe(new DataStream())
.setOptions({maxParallel: 1})
.assign(async measurement => {
const record = convertMeasurementToSQLObject(measurement, st, pg);
try {
await table.insert(record);
} catch (cause) {
if (cause.code === '23505') {
return { status: 'duplicate' };
}
throw cause;
}
return { status: 'inserted' };
});
}
async (out, source) => {
const failures = {};
const input = new DataStream();
let error = null;
const output = input
.use(fixMeasurements, source)
.use(validateMeasurements, source)
.use(removeUnwantedParameters)
.use(handleMeasurementErrors, failures, source)
.use(forwardErrors, stream, source, failures, env)
;
try {
log.debug(`Looking up adapter for source "${source && source.name}"`);
const adapter = await getAdapterForSource(source);
(await getStreamFromAdapter(adapter, source)).pipe(input);
function saveResultsToS3 (output, s3, bucketName, key, s3ChunkSize) {
const stream = new DataStream();
const finishing = stream
.catch(ignore)
.map(({status, ...measurement}) => measurement)
.use(streamDataToS3, s3, bucketName, key, s3ChunkSize)
.run()
.catch(e => output.raise(e));
const endStream = () => {
if (!ended) stream.end();
ended = true;
};
let ended = false;
output.whenEnd().then(endStream).catch(ignore);
export function streamMeasurementsToDBAndStorage (sourcesStream, {doSaveToS3, s3ChunkSize, dryrun, bucketName}) {
if (dryrun) {
return sourcesStream.do(async ({ stream: measurementStream }) => {
return measurementStream
.do(m => log.verbose(JSON.stringify(m)))
.run();
});
} else {
const output = new DataStream();
let s3stream = null;
if (doSaveToS3) {
const key = `realtime/${moment().format('YYYY-MM-DD/X')}.ndjson`;
s3stream = saveResultsToS3(output, new (require('aws-sdk').S3)(), bucketName, key, s3ChunkSize);
}
sourcesStream
.map(async (item) => {
const pg = await getDB();
const { stream: measurementStream, counts } = item;
const stream = measurementStream.use(streamDataToDB, pg, counts);
await (doSaveToS3 ? s3stream.pull(stream) : stream.run());
return item;
return DataStream
.from(
request(options)
.pipe(JSONStream.parse('map.station_list.*'))
)
.setOptions({maxParallel: 5})
.into(
(siteStream, site) => {
return siteStream.whenWrote({
station_id: site['station_id'],
station_name: site['station_name'],
coords: {latitude: Number(site['latitude']), longitude: Number(site['longitude'])}
});
},
new DataStream()
)
.into(
async (measurements, {coords, station_id: stationId}) => {
const options = Object.assign(requestOptions, {
url: 'https://app.cpcbccr.com/caaqms/caaqms_viewdata_v2',
body: Buffer.from(`{"site_id":"${stationId}"}`).toString('base64'),
resolveWithFullResponse: true
});
try {
const response = await rp(options);
const {siteInfo, tableData: {bodyContent}} = JSON.parse(response.body);
await (
DataStream
.from(bodyContent)
(await getStreamFromAdapter(adapter, source)).pipe(input);
if (error) throw error;
else error = true;
} catch (cause) {
await (
input.raise(
cause instanceof AdapterError ? cause : new AdapterError(ADAPTER_ERROR, source, cause)
)
);
input.end();
}
await out.whenWrote(createFetchObject(output, source, failures, env.dryrun));
},
new DataStream()
);
}