Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
function putValue(value, isChange) {
if (typeof value === 'undefined') return;
// console.log('Fill/subscribe: PutValue', value);
if (isChange) {
// console.log('Data before sieve', debug(data));
const sieved = sieve(data, value);
// console.log('Data after sieve', debug(data));
if (!sieved.length) return;
merge(payload, sieved);
} else {
merge(data, value);
// console.log('Payload before adding value', debug(payload));
if (raw) merge(payload, value);
// console.log('Payload after adding value');
}
let { known, unknown, extraneous } = slice(data, originalQuery);
data = known || [];
// console.log('After slice', debug(data), unknown && debug(unknown));
// console.log('Payload and value', debug(payload), value && debug(value));
if (isChange && value && unknown) {
// The sieve may have removed some necessary data (that we weren't aware
// was necessary). Get it back.
async putValue(value) {
/*
Merge value into cache, and get the change tree
extraChanges = await normalizeSubscriptions()
If raw, merge extraChanges into change and call listeners with that;
Othersiwe, call listeners with values from cache
*/
if (typeof value === 'undefined') value = {};
merge(this.data, value);
// console.log(this.id, 'value', value);
const gaps = getUnknown(this.data, this.sumQuery);
let fillData;
if (gaps) {
// The change added a link to some data we don't have in the cache.
// We need to fetch it and also resubscribe.
// console.log(
// 'Change caused gaps',
// value && value.visitorsByTime && value.visitorsByTime.__page__,
// this.data &&
// this.data.visitorsByTime &&
// this.data.visitorsByTime.__page__,
// );
// console.log('SumQuery', this.sumQuery);
// console.log('Gaps', gaps);
// this.data.visitorsByTime.__page__,
// );
// console.log('SumQuery', this.sumQuery);
// console.log('Gaps', gaps);
await this.resubscribe();
try {
fillData = await this.store.get(gaps, {
skipCache: true,
raw: true,
});
} catch (e) {
console.error('Error getting fillData for', gaps);
console.error(e);
}
if (fillData) {
merge(this.data, fillData);
merge(value, fillData);
}
}
for (const id in this.listeners) {
const { lastQuery = {}, originalQuery, push } = this.listeners[id];
const nextQuery = linkKnown(this.data, originalQuery);
const query = addQueries(lastQuery, nextQuery);
this.listeners[id].lastQuery = nextQuery;
const payload = getKnown(value, query);
if (payload) push(payload);
}
}
}
// );
// console.log('SumQuery', this.sumQuery);
// console.log('Gaps', gaps);
await this.resubscribe();
try {
fillData = await this.store.get(gaps, {
skipCache: true,
raw: true,
});
} catch (e) {
console.error('Error getting fillData for', gaps);
console.error(e);
}
if (fillData) {
merge(this.data, fillData);
merge(value, fillData);
}
}
for (const id in this.listeners) {
const { lastQuery = {}, originalQuery, push } = this.listeners[id];
const nextQuery = linkKnown(this.data, originalQuery);
const query = addQueries(lastQuery, nextQuery);
this.listeners[id].lastQuery = nextQuery;
const payload = getKnown(value, query);
if (payload) push(payload);
}
}
}
async init() {
// We need this line to be a separate function because the
// constructor can't be async. We're okay even if pub is
// called before this happens.
const { options, query } = this;
let data = await options.resolve(query);
merge(data, this.earlyChange);
// TODO: Properly resolve, getKnown etc. after early changes are merged.
delete this.earlyChange;
this.data = data = getKnown(data, query) || {};
this.push(options.values ? graft(data, query) || {} : data);
}
async init() {
// We need this line to be a separate function because the
// constructor can't be async. We're okay even if pub is
// called before this happens.
const { options, query } = this;
let data = await options.resolve(query);
merge(data, this.earlyChange);
// TODO: Properly resolve, getKnown etc. after early changes are merged.
delete this.earlyChange;
this.data = data = getKnown(data, query) || {};
this.push(options.values ? graft(data, query) || {} : data);
}
async resubscribe() {
const query =
this.sumQuery && linkKnown(this.data, simplifyQuery(this.sumQuery));
// console.log(this.id, 'Resubscribe called', query);
if (isEqual(this.upstreamQuery, query)) return;
if (this.upstream) {
// console.log(this.id, 'Closing upstream sub', this.upstreamQuery);
this.upstream.return();
}
this.upstreamQuery = query;
if (!query) return;
// console.log(this.id, 'Opening upstream sub', query);
this.upstream = this.store.sub(query, { skipCache: true, raw: true });
this.putStream(this.upstream);
}
getStream(query) {
const id = this.lastListenerId++;
const [push, stream] = makeStream(() => {
delete this.listeners[id];
this.sumQuery = subtractQueries(this.sumQuery, query);
// console.log(this.id, 'Ended stream', id, query, this.sumQuery);
// console.log(this.id, 'Resubscribe due to stream close');
return this.resubscribe();
});
this.listeners[id] = {
originalQuery: query,
query: linkKnown(this.data, query),
push,
};
this.sumQuery = addQueries(this.sumQuery, query);
// console.log(this.id, 'Started stream', id, query, this.sumQuery);
// console.log(this.id, 'Resubscribe due to query start');
this.resubscribe();
return stream;
}
function simulateLeave() {
let delId;
do {
delId = Math.floor(Math.random() * id);
} while (freeIds.has(delId));
freeIds.add(delId);
delId = '' + delId;
const delTs = unwrap(state, ['visitors', delId, 'ts']);
// console.log('Unwrap', debug(state), ['visitors', delId, 'ts'], delTs);
leave++;
return graph(
{
visitors: { [delId]: null },
visitorsByTime: { [delTs]: null },
},
ts,
);
}
if (upstream) upstream.return(); // Close the existing stream.
upstream = store.call('watch', query, { skipFill: true });
let { value } = await upstream.next();
// console.log('Got first subscription value', value && debug(value));
if (typeof value === 'undefined') {
// The upstream is a change subscription, not a live query,
// so we need to fetch the initial value.
// TODO: Get a version corresponding to the subscription's start
// and verify that the store.read response is newer.
value = await store.call('read', unknown, { skipFill: true });
}
value = slice(value, unknown).known;
putValue(value, false);
} catch (e) {
error(e);
}
putStream(upstream);
}