Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
});
socket.addEventListener('close', () => {
try {
c.close();
} catch(e) {
// The stream has already been closed.
}
});
socket.addEventListener('error', (evt) => c.error(evt.data || evt));
},
cancel: function() {
maybeCloseSocket();
}
});
this.writable = new streams.WritableStream({
start: function(error) {
socket.addEventListener('error', (evt) => error(evt.data || evt));
},
write: function(chunk) {
socket.send(chunk);
// We don't know when send completes, so this is synchronous.
},
close: function() {
maybeCloseSocket();
}
})
}
if (knownFilteredSet.delete(change.id)) {
change = shallowClone(change);
mutateChangeToResembleDeletion(change);
enqueue(change);
}
}
done();
});
}
},
writableStrategy: new CountQueuingStrategy({ highWaterMark: 1 }),
readableStrategy: new CountQueuingStrategy({ highWaterMark: 1 })
});
//bufferingStream.readable.pipeTo(gatherStream.writable);
gatherStream.readable.pipeThrough(filterStream).pipeTo(new WritableStream({
start() {
},
write(change) {
onFilteredUpdate(change);
},
close() {
// I don't think anything actually cares? Unless we should be propagating
// through to the moot callback?
},
abort(ex) {
logic(ctx, 'filteringStreamAbortError', { ex, stack: ex.stack });
}
}, new CountQueuingStrategy({ highWaterMark: 1 })));
return {
/**