Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
} catch (err) {
console.error(err);
}
// Test convenience method "parse"
try {
const csv = parse(data, opts);
console.log(csv);
} catch (err) {
console.error(err);
}
// Test for Asynchronous Parser
const transformOpts = { highWaterMark: 8192 };
const asyncParser = new AsyncParser(opts, transformOpts);
let csv = '';
asyncParser.processor
.on('data', chunk => (csv += chunk.toString()))
.on('end', () => console.log(csv))
.on('error', err => console.error(err));
// Test for transform events
asyncParser.transform
.on('header', header => console.log(header))
.on('line', line => console.log(line))
.on('error', err => console.log(err));
asyncParser.input.push(data); // This data might come from an HTTP request, etc.
asyncParser.input.push(null); // Sending `null` to a stream signal that no more data is expected and ends it.
export const transform = (parserOpts = {}) => observable => {
// XXX: the parser wants to add a newline to everything we push to it for
// some reason. likely "user error"
const parser = new AsyncParser(
{...parserOpts, eol: '', flatten: true},
{
objectMode: true
}
);
return observable.pipe(
finalize(() => {
parser.input.push(null);
}),
tap(row => {
parser.input.push(row);
}),
concatMapTo(fromEvent(parser.processor, 'data')),
takeUntil(fromEvent(parser.processor, 'end')),
filter(Boolean),
map(_.pipe(_.trim, stripAnsi))
export const toCsv = (parserOpts = {}, streamOpts = {}) => observable => {
const parser = new AsyncParser(parserOpts, {...streamOpts, objectMode: true});
return observable.pipe(
finalize(() => {
parser.input.push(null);
}),
tap(row => {
parser.input.push(row);
}),
concatMapTo(fromEvent(parser.processor, 'data')),
takeUntil(fromEvent(parser.processor, 'end')),
map(_.trimEnd)
);
};