Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parse(input, output) {
this.logger.info('Starting conversion')
let rowIndex = 2
highland(input)
// Parse CSV return each row as object
.through(
csv({
separator: this.delimiter,
strict: true,
})
)
// Sort by SKU so later when reducing prices we can easily group by SKU
.sortBy((a, b) => a['variant-sku'].localeCompare(b['variant-sku']))
// Limit amount of rows to be handled at the same time
.batch(this.batchSize)
.stopOnError(err => {
this.logger.error(err)
output.emit('error', err)
})
.flatMap(highland)
// Unflatten object keys with a dot to nested values
.map(unflatten)
.map(this.transformPriceData)
parse(input: stream$Readable, output: stream$Writable) {
this.logger.info('Starting conversion')
highland(input, output)
.through(
csv({
separator: this.delimiter,
strict: true,
})
)
.map(CsvParserDiscountCode._removeEmptyFields)
.map(unflatten)
.map(this._cartDiscountsToArray)
.map(this._groupsToArray)
.map(castTypes)
.errors(this._handleErrors) // <- Pass errors to errorHandler
.stopOnError((error: HttpErrorType) => {
// <- Emit error and close stream if needed
output.emit('error', error)
})
.doto(() => {
this._rowIndex += 1
return new Promise((resolve, reject) => {
if (input === undefined) resolve({})
else if (input.match(/\.json$/i))
resolve(JSON.parse(fs.readFileSync(input)))
else if (input.match(/\.csv$/i)) {
const _attributes = []
fs.createReadStream(input)
.pipe(
csv({
separator: _args.delimiter,
strict: true,
})
)
.on('error', error => {
reject(error)
})
.on('data', data => {
const arrayDelim = _args.multiValueDelimiter
// pass to `prepareInput` to handle object formatting
_attributes.push(prepareInput(data, arrayDelim))
})
.on('end', () => {
resolve(unflatten(_attributes[0]))
})
}
_streamInput(input, output) {
let rowIndex = 1
return highland(input)
.through(
csv({
separator: this.csvConfig.delimiter,
strict: this.csvConfig.strictMode,
})
)
.stopOnError(err => {
this.logger.error(err)
return output.emit('error', err)
})
.batch(this.csvConfig.batchSize)
.doto(data => {
this.logger.verbose(`Parsed row-${rowIndex}: ${JSON.stringify(data)}`)
rowIndex += 1
})
.flatMap(highland)
.flatMap(data => data |> this._processData |> highland)
.stopOnError(err => {
parse(input: stream$Readable, output: stream$Writable) {
this.logger.info('Starting conversion')
highland(input)
.through(csv({ separator: this.csvConfig.delimiter, strict: true }))
.map(CsvParserState._removeEmptyFields)
.map(CsvParserState._parseInitialToBoolean)
.map(unflatten)
.map((state: StateWithStringTransitions): StateWithStringTransitions =>
CsvParserState._mapMultiValueFieldsToArray(
state,
this.csvConfig.multiValueDelimiter
)
)
.flatMap(
(
state: StateWithUnresolvedTransitions
): StateWithUnresolvedTransitions =>
highland(this._transformTransitions(state))
)
.errors((error: HttpErrorType, cb: Function): void =>