Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
function putStream (cache, key, opts) {
opts = opts || {}
let integrity
let size
const contentStream = write.stream(
cache, opts
).on('integrity', int => {
integrity = int
}).on('size', s => {
size = s
})
let memoData
let memoTotal = 0
const stream = to((chunk, enc, cb) => {
contentStream.write(chunk, enc, () => {
if (opts.memoize) {
if (!memoData) { memoData = [] }
memoData.push(chunk)
memoTotal += chunk.length
}
cb()
})
}, cb => {
contentStream.end(() => {
// TODO - stop modifying `opts`
opts.size = size
index.insert(cache, key, integrity, opts).then(entry => {
if (opts.memoize) {
memo.put(cache, entry, Buffer.concat(memoData, memoTotal), opts)
}
function writeStream (cache, opts) {
opts = opts || {}
const inputStream = new PassThrough()
let inputErr = false
function errCheck () {
if (inputErr) { throw inputErr }
}
let allDone
const ret = to((c, n, cb) => {
if (!allDone) {
allDone = handleContent(inputStream, cache, opts, errCheck)
}
inputStream.write(c, n, cb)
}, cb => {
inputStream.end(() => {
if (!allDone) {
const e = new Error(Y`Cache input stream was empty`)
e.code = 'ENODATA'
return ret.emit('error', e)
}
allDone.then(res => {
res.integrity && ret.emit('integrity', res.integrity)
res.size !== null && ret.emit('size', res.size)
cb()
}, e => {
function writeStream (cache, opts) {
opts = opts || {}
const inputStream = new PassThrough()
let inputErr = false
function errCheck () {
if (inputErr) {
throw inputErr
}
}
let allDone
const ret = to(
(c, n, cb) => {
if (!allDone) {
allDone = handleContent(inputStream, cache, opts, errCheck)
}
inputStream.write(c, n, cb)
},
(cb) => {
inputStream.end(() => {
if (!allDone) {
const e = new Error('Cache input stream was empty')
e.code = 'ENODATA'
return ret.emit('error', e)
}
allDone.then(
(res) => {
res.integrity && ret.emit('integrity', res.integrity)
stream.unpipe()
stream.end()
} else if (opts.cache) {
opts.log.silly('registry.get', 'request successful. streaming data to cache')
var localopt = Object.create(opts)
localopt.metadata = {
etag: res.headers['etag'],
lastModified: res.headers['last-modified'],
cacheControl: res.headers['cache-conrol'],
time: +(new Date())
}
var cacheStream = cache.put.stream(opts.cache, key, localopt)
pipe(
res,
decoder,
to(function (chunk, enc, cb) {
cacheStream.write(chunk, enc)
stream.write(chunk, enc, cb)
}, function (cb) {
opts.log.silly('registry.get', 'request for', key, 'done.')
cacheStream.on('error', cb)
cacheStream.end(function () {
opts.log.silly('registry.get', 'wrapped up cacheStream')
stream.end(cb)
})
}),
function (err) {
if (err) { return stream.emit('error', err) }
opts.log.silly('registry.get', 'pipeline for', key, 'done')
}
)
} else {
function putStream (cache, key, opts) {
opts = PutOpts(opts)
let integrity
let size
const contentStream = write
.stream(cache, opts)
.on('integrity', (int) => {
integrity = int
})
.on('size', (s) => {
size = s
})
let memoData
let memoTotal = 0
const stream = to(
(chunk, enc, cb) => {
contentStream.write(chunk, enc, () => {
if (opts.memoize) {
if (!memoData) {
memoData = []
}
memoData.push(chunk)
memoTotal += chunk.length
}
cb()
})
},
(cb) => {
contentStream.end(() => {
index
.insert(cache, key, integrity, opts.concat({ size }))