Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if (upstream) upstream.return(); // Close the existing stream.
upstream = store.call('watch', query, { skipFill: true });
let { value } = await upstream.next();
// console.log('Got first subscription value', value && debug(value));
if (typeof value === 'undefined') {
// The upstream is a change subscription, not a live query,
// so we need to fetch the initial value.
// TODO: Get a version corresponding to the subscription's start
// and verify that the store.read response is newer.
value = await store.call('read', unknown, { skipFill: true });
}
value = slice(value, unknown).known;
putValue(value, false);
} catch (e) {
error(e);
}
putStream(upstream);
}
if (raw) merge(payload, value);
// console.log('Payload after adding value');
}
let { known, unknown, extraneous } = slice(data, originalQuery);
data = known || [];
// console.log('After slice', debug(data), unknown && debug(unknown));
// console.log('Payload and value', debug(payload), value && debug(value));
if (isChange && value && unknown) {
// The sieve may have removed some necessary data (that we weren't aware
// was necessary). Get it back.
// console.log('Here');
const valueParts = slice(value, unknown);
if (valueParts.known) {
merge(data, valueParts.known);
if (raw) merge(payload, valueParts.known);
unknown = valueParts.unknown;
}
}
// This is not an else; previous block might update unknown.
if (!unknown) {
// console.log('Pushing', payload);
push(raw ? payload : data);
payload = [];
}
if (unknown || extraneous) resubscribe(unknown, extraneous);
}
// console.log('Fill/subscribe: PutValue', value);
if (isChange) {
// console.log('Data before sieve', debug(data));
const sieved = sieve(data, value);
// console.log('Data after sieve', debug(data));
if (!sieved.length) return;
merge(payload, sieved);
} else {
merge(data, value);
// console.log('Payload before adding value', debug(payload));
if (raw) merge(payload, value);
// console.log('Payload after adding value');
}
let { known, unknown, extraneous } = slice(data, originalQuery);
data = known || [];
// console.log('After slice', debug(data), unknown && debug(unknown));
// console.log('Payload and value', debug(payload), value && debug(value));
if (isChange && value && unknown) {
// The sieve may have removed some necessary data (that we weren't aware
// was necessary). Get it back.
// console.log('Here');
const valueParts = slice(value, unknown);
if (valueParts.known) {
merge(data, valueParts.known);
if (raw) merge(payload, valueParts.known);
unknown = valueParts.unknown;
}
store.on('get', [], async function(query, options, next) {
let value = await next(query);
if (options.skipFill) return value;
let budget = MAX_RECURSIONS;
while (budget-- > 0) {
const { known, unknown } = slice(value, query);
value = known;
if (!unknown) break;
merge(value, await store.get(unknown, { skipFill: true }));
}
if (!budget) throw new Error('fill.max_recursion');
return value;
});