Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
export default function subscribe(store, originalQuery, raw) {
let push, end;
let upstream;
let query = [];
let data = [];
let payload = [];
resubscribe(originalQuery);
return makeStream((streamPush, streamEnd) => {
push = v => {
// console.log('Push', debug(v));
streamPush(v);
};
end = streamEnd;
return unsubscribe;
});
async function resubscribe(unknown, _extraneous) {
try {
const changed = add(query, unknown);
// console.log('Resubscribe');
if (!changed) return;
if (upstream) upstream.return(); // Close the existing stream.
upstream = store.call('watch', query, { skipFill: true });
pushRemaining = push;
});
for await (const value of next(nextPayload)) {
const unwrappedValue = unwrap(value, path);
const remainingValue = remove(value, path);
if (remainingValue) pushRemaining(remainingValue);
if (unwrappedValue) yield unwrappedValue;
}
};
const unwrappedStream = fn(unwrappedPayload, options, shiftedNext);
// We expect next() to be called before the first value is yielded.
const firstValue = await (await unwrappedStream.next()).value;
const resultStream = makeStream(push => {
push(wrap(firstValue, path));
mapStream(unwrappedStream, value => {
push(wrap(value, path));
});
return () => unwrappedStream.return();
});
if (!nextCalled && remainingPayload.length) {
remainingNextStream = next(remainingPayload);
}
yield* remainingNextStream
? mergeStreams(resultStream, remainingNextStream)
: resultStream;
};
}
const shiftedNext = async function*(unwrappedNextPayload) {
nextCalled = true;
const nextPayload = wrap(unwrappedNextPayload, path);
if (remainingPayload.length) merge(nextPayload, remainingPayload);
let pushRemaining;
remainingNextStream = makeStream(push => {
pushRemaining = push;
});
for await (const value of next(nextPayload)) {
const unwrappedValue = unwrap(value, path);
const remainingValue = remove(value, path);
if (remainingValue) pushRemaining(remainingValue);
if (unwrappedValue) yield unwrappedValue;
}
};