Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
onCompleted: sinon.stub()
}
}
this.mocks.QueryExecutor = sinon.stub().returns(this.mocks.queryExecutor)
this.mocks.Responder = sinon.stub().returns(this.mocks.responder)
this.mocks.SubscribeCallbacks = sinon.stub().returns(this.mocks.subscribeCallbacks)
// proxyquire
this.DataHandler = proxyquire('../src/server/data-handler.js', {
'./active-subscriptions.js': this.mocks.activeSubscriptions,
'./query-executor.js': this.mocks.QueryExecutor,
'./responder.js': this.mocks.Responder,
'./subscribe-callbacks.js': this.mocks.SubscribeCallbacks
})
var toServer = through2.obj()
var toClient = through2.obj()
this.primusClient = duplexify.obj(toServer, toClient)
this.spark = duplexify.obj(toClient, toServer)
this.spark.id = 'sparkId'
})
emitError: true
}, options);
const forkStream = new ForkStream({
classifier: (file, cb) => cb(null, !!condition(file))
});
forkStream.a.pipe(conditionStream);
// merge-stream package cannot be updated because it emits the error
// from conditionStream to mergedStream
const mergedStream = mergeStream(forkStream.b, conditionStream);
const outStream = through2.obj();
mergedStream.pipe(outStream);
const duplexStream = duplexify.obj(forkStream, outStream);
if (options.emitError) {
conditionStream.on('error', err => duplexStream.emit('error', err));
}
return duplexStream;
}
};
? new NodeDefaultCryptographicMaterialsManager(cmm)
: cmm
const parseHeaderStream = new ParseHeaderStream(cmm)
const verifyStream = new VerifyStream({ maxBodySize })
const decipherStream = getDecipherStream()
/* pipeline will _either_ stream.destroy or the callback.
* decipherStream uses destroy to dispose the material.
* So I tack a pass though stream onto the end.
*/
pipeline(parseHeaderStream, verifyStream, decipherStream, new PassThrough(), (err: Error) => {
if (err) stream.emit('error', err)
})
const stream = new Duplexify(parseHeaderStream, decipherStream)
// Forward header events
parseHeaderStream
.once('MessageHeader', header => stream.emit('MessageHeader', header))
return stream
}
const getOrSetStream = (key, getStreamFn, opts = {}) => {
const proxy = duplexify()
const onError = (err) => {
proxy.destroy(err)
}
const getKey = opts.refresh
? () => Promise.reject(new KeyNotExistsError(key))
: () => get(key, { stream: true })
getKey()
.then((rs) => {
proxy.setReadable(rs)
// since already cached, we can fire 'finish' event
proxy.emit('finish')
})
.catch((err) => {
if (err instanceof KeyNotExistsError) {
cmm: KeyringNode|NodeMaterialsManager,
op: EncryptStreamInput = {}
): Duplex {
const { suiteId, encryptionContext = {}, frameLength = FRAME_LENGTH, plaintextLength } = op
/* Precondition: The frameLength must be less than the maximum frame size Node.js stream. */
needs(frameLength > 0 && Maximum.FRAME_SIZE >= frameLength, `frameLength out of bounds: 0 > frameLength >= ${Maximum.FRAME_SIZE}`)
/* If the cmm is a Keyring, wrap it with NodeDefaultCryptographicMaterialsManager. */
cmm = cmm instanceof KeyringNode
? new NodeDefaultCryptographicMaterialsManager(cmm)
: cmm
const suite = suiteId && new NodeAlgorithmSuite(suiteId)
const wrappingStream = new Duplexify()
cmm.getEncryptionMaterials({ suite, encryptionContext, plaintextLength })
.then(async (material) => {
const { dispose, getSigner } = getEncryptHelper(material)
const { getCipher, messageHeader, rawHeader } = getEncryptionInfo(material, frameLength)
wrappingStream.emit('MessageHeader', messageHeader)
const encryptStream = getFramedEncryptStream(getCipher, messageHeader, dispose, plaintextLength)
const signatureStream = new SignatureStream(getSigner)
pipeline(encryptStream, signatureStream)
wrappingStream.setReadable(signatureStream)
// Flush the rawHeader through the signatureStream
// defer piping, so consumer can attach event listeners
// otherwise we might lose events
process.nextTick(() => {
duplex.pipe(this._parser)
})
this._generator.on('error', this.emit.bind(this, 'error'))
this._parser.on('error', this.emit.bind(this, 'error'))
this.stream = duplex
duplex.on('error', this.emit.bind(this, 'error'))
duplex.on('close', this.emit.bind(this, 'close'))
Duplexify.call(this, this._generator, this._parser, { objectMode: true })
// MQTT.js basic default
if (opts.notData !== true) {
var that = this
this.once('data', function (connectPacket) {
that.setOptions(connectPacket)
that.on('data', emitPacket)
if (cb) {
cb()
}
that.emit('data', connectPacket)
})
}
}
GrpcService.prototype.requestWritableStream = function(protoOpts, reqOpts) {
var stream = protoOpts.stream = protoOpts.stream || duplexify.obj();
if (global.GCLOUD_SANDBOX_ENV) {
return stream;
}
var self = this;
if (!this.grpcCredentials) {
// We must establish an authClient to give to grpc.
this.getGrpcCredentials_(function(err, credentials) {
if (err) {
stream.destroy(err);
return;
}
self.grpcCredentials = credentials;
function createStream (script, opts_) {
var opts = opts_ || {}
var duplex = duplexify.obj(null, null)
function errback(err) {
if (err) duplex.destroy(err)
}
// Find the cscript binary. If we're on 64-bit Windows and 32-bit
// Node.js then prefer the native "Sysnative\cscript.exe", because
// otherwise we would be redirected to "SysWow64\cscript.exe" and
// then be unable to access the native registry (without resorting
// to the slower ExecMethod). See also win-detect-browsers#18.
wbin('cscript', { native: opts.native }, function(err, bin) {
if (err) return duplex.destroy(err)
var args = ['//Nologo', '//B', resolve(script)].concat(opts.args || [])
var child = execFile(bin, args, errback)
// special constructor treatment for native websockets in browsers, see
// https://github.com/maxogden/websocket-stream/issues/82
if (isNative && isBrowser) {
socket = new WS(target, protocols)
} else {
socket = new WS(target, protocols, options)
}
socket.binaryType = 'arraybuffer'
}
// was already open when passed in
if (socket.readyState === socket.OPEN) {
stream = proxy
} else {
stream = duplexify.obj()
socket.onopen = onopen
}
stream.socket = socket
socket.onclose = onclose
socket.onerror = onerror
socket.onmessage = onmessage
proxy.on('close', destroy)
var coerceToBuffer = !options.objectMode
function socketWriteNode(chunk, enc, next) {
// avoid errors, this never happens unless
// destroy() is called
function StreamChannels (opts, onchannel) {
if (!(this instanceof StreamChannels)) return new StreamChannels(opts, onchannel)
if (typeof opts === 'function') {
onchannel = opts
opts = null
}
if (!opts) opts = {}
duplexify.call(this)
var self = this
this.destroyed = false
this.limit = opts.limit || 1024
this.state = null // set by someone else. here for perf
this._outgoing = []
this._incoming = []
this._waiting = 0
this._encode = new Sink()
this._decode = lpstream.decode({allowEmpty: true, limit: opts.messageLimit || 5 * 1024 * 1024})
this._decode.on('data', parse)
this._keepAlive = 0
this._remoteKeepAlive = 0