Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
getStream(query) {
const id = this.lastListenerId++;
const [push, stream] = makeStream(() => {
delete this.listeners[id];
this.sumQuery = subtractQueries(this.sumQuery, query);
// console.log(this.id, 'Ended stream', id, query, this.sumQuery);
// console.log(this.id, 'Resubscribe due to stream close');
return this.resubscribe();
});
this.listeners[id] = {
originalQuery: query,
query: linkKnown(this.data, query),
push,
};
this.sumQuery = addQueries(this.sumQuery, query);
// console.log(this.id, 'Started stream', id, query, this.sumQuery);
// console.log(this.id, 'Resubscribe due to query start');
this.resubscribe();
return stream;
}
pubs[id] = async change => {
if (earlyChange) {
merge(earlyChange, change);
return;
}
// Returns early if the change does not have any overlap with the query.
// DO NOT getKnown the change to only those changes that overlap; when the
// overlapping portion includes a deletion in a range, the change set
// may contain additional items to make up.
if (!getKnown(change, linkKnown(data, query))) return;
merge(data, change);
const nextQuery = getUnknown(data, query);
if (nextQuery) {
const linked = await store.get(nextQuery, options);
merge(data, linked);
if (!options.values) merge(change, linked);
}
data = getKnown(data, query);
push(options.values ? graft(data, query) || {} : change);
};
const pub = async change => {
if (earlyChange) {
merge(earlyChange, change);
return;
}
// Returns early if the change does not have any overlap with the query.
// DO NOT getKnown the change to only those changes that overlap; when the
// overlapping portion includes a deletion in a range, the change set
// may contain additional items to make up.
if (!getKnown(change, linkKnown(data, query))) return;
merge(data, change);
const nextQuery = getUnknown(data, query);
if (nextQuery) {
const linked = await options.resolve(nextQuery);
merge(data, linked);
if (!options.values) merge(change, linked);
}
data = getKnown(data, query);
push(options.values ? graft(data, query) || {} : change);
};
skipCache: true,
raw: true,
});
} catch (e) {
console.error('Error getting fillData for', gaps);
console.error(e);
}
if (fillData) {
merge(this.data, fillData);
merge(value, fillData);
}
}
for (const id in this.listeners) {
const { lastQuery = {}, originalQuery, push } = this.listeners[id];
const nextQuery = linkKnown(this.data, originalQuery);
const query = addQueries(lastQuery, nextQuery);
this.listeners[id].lastQuery = nextQuery;
const payload = getKnown(value, query);
if (payload) push(payload);
}
}
}