Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
function extractShrinkwrap (pkgStream, opts, cb) {
var extract = tar.extract()
// The extra `through` is to compensate for misbehaving `pkgStream`s.
// For example, `request` streams are notoriously unreliable.
// This is a bit of defensive programming, not a fix for
// a specific known example of an issue.
var unzipped = pipeline(through(), gunzip(), extract)
var shrinkwrap = null // we'll pop the data in here if found.
extract.on('entry', function onEntry (header, fileStream, next) {
if (header.name === 'package/npm-shrinkwrap.json') {
opts.log.silly('extract-shrinkwrap', 'found shrinkwrap')
// got a shrinkwrap! Now we don't need to look for entries anymore.
extract.removeListener('entry', onEntry)
// Grab all the file data off the entry fileStream.
var data = ''
fileStream.on('data', function (d) { data += d })
finished(fileStream, function (err) {
if (err) { return extract.emit('error', err) }
try {
shrinkwrap = JSON.parse(data)
function extractStream (dest, opts) {
opts = opts || {}
const sawIgnores = {}
return pipeline(gunzip(), tar.extract(dest, {
map: (header) => {
if (uid != null) { header.uid = uid }
if (gid != null) { header.gid = gid }
// Note: This mirrors logic in the fs read operations that are
// employed during tarball creation, in the fstream-npm module.
// It is duplicated here to handle tarballs that are created
// using other means, such as system tar or git archive.
if (header.type === 'file') {
const base = path.basename(header.name)
if (base === '.npmignore') {
sawIgnores[header.name] = true
} else if (base === '.gitignore') {
const npmignore = header.name.replace(/\.gitignore$/, '.npmignore')
if (!sawIgnores[npmignore]) {
// Rename, may be clobbered later.
header.name = npmignore
return BB.try(() => {
if (opts.stream && opts.gzip) {
return pipeline(pack, zlib.createGzip())
} else if (opts.stream) {
return pack
} else {
return getStream.buffer(pack).then(ret => {
if (opts.gzip) {
return BB.fromNode(cb => zlib.gzip(ret, cb))
} else {
return ret
}
})
}
})
}
current: documentCount,
total: '?',
update: true
})
lastReported = now
}
cb(null, chunk)
}
const inputStream = await getDocumentsStream(options.client, options.dataset)
debug('Got HTTP %d', inputStream.statusCode)
debug('Response headers: %o', inputStream.headers)
const jsonStream = miss.pipeline(
inputStream,
logFirstChunk(),
split(JSON.parse),
rejectOnApiError(),
filterSystemDocuments(),
assetStreamHandler,
filterDocumentTypes(options.types),
options.drafts ? miss.through.obj() : filterDrafts(),
stringifyStream(),
miss.through(reportDocumentCount)
)
miss.finished(jsonStream, async err => {
if (err) {
return
}
return peek({newline: false, maxBuffer: 300}, (data, swap) => {
if (isTar(data)) {
debug('Stream is a tarball, extracting to %s', outputPath)
isTarStream = true
return swap(null, tar.extract(outputPath))
}
debug('Stream is an ndjson file, streaming JSON')
const jsonStreamer = getJsonStreamer()
const concatter = miss.concat(resolveNdjsonStream)
const ndjsonStream = miss.pipeline(jsonStreamer, concatter)
ndjsonStream.on('error', err => {
uncompressStream.emit('error', err)
destroy([uncompressStream, jsonStreamer, concatter, ndjsonStream])
reject(err)
})
return swap(null, ndjsonStream)
})
}
new Promise((resolve, reject) => {
const outputPath = path.join(tempy.directory(), 'sanity-import')
debug('Importing from stream')
let isTarStream = false
let jsonDocuments
const uncompressStream = miss.pipeline(gunzipMaybe(), untarMaybe())
miss.pipe(
stream,
uncompressStream,
err => {
if (err) {
reject(err)
return
}
if (isTarStream) {
findAndImport()
} else {
resolve(importers.fromArray(jsonDocuments, options))
}
}
)
function importPipeline (jawn, opts) {
if (!opts) opts = {}
var writeStream = jawn.core.createWriteStream(opts)
var parser = parseInputStream(opts)
var transform = through.obj(stringifyData, end)
var importStream = miss.pipeline(parser, transform, writeStream)
function stringifyData (data, enc, next) {
this.push(JSON.stringify(data))
next()
}
function end (done) {
done()
}
importStream.writeStream = writeStream
return importStream
}
memoData && memoData.push(c)
memoLength += c.length
cb(null, c, en)
},
(cb) => {
memoData &&
memo.put.byDigest(
cache,
integrity,
Buffer.concat(memoData, memoLength),
opts
)
cb()
}
)
stream = pipeline(stream, memoStream)
}
return stream
}
}