Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
getStream(query) {
const id = this.lastListenerId++;
const [push, stream] = makeStream(() => {
delete this.listeners[id];
this.sumQuery = subtractQueries(this.sumQuery, query);
// console.log(this.id, 'Ended stream', id, query, this.sumQuery);
// console.log(this.id, 'Resubscribe due to stream close');
return this.resubscribe();
});
this.listeners[id] = {
originalQuery: query,
query: linkKnown(this.data, query),
push,
};
this.sumQuery = addQueries(this.sumQuery, query);
// console.log(this.id, 'Started stream', id, query, this.sumQuery);
// console.log(this.id, 'Resubscribe due to query start');
this.resubscribe();
merge(data, change);
const nextQuery = getUnknown(data, query);
if (nextQuery) {
const linked = await options.resolve(nextQuery);
merge(data, linked);
if (!options.values) merge(change, linked);
}
data = getKnown(data, query);
push(options.values ? graft(data, query) || {} : change);
};
// Create and return an Async Iterable.
return makeStream(iterPush => {
push = iterPush;
return signal;
});
}
constructor(query, options) {
this.query = query;
this.options = options;
const [push, stream] = makeStream(options.onClose);
this.stream = stream;
this.push = push;
this.earlyChange = {};
this.init();
}
constructor(query, options) {
this.query = query;
this.options = options;
this.stream = makeStream(push => {
this.push = push;
return options.onClose;
});
this.earlyChange = {};
this.init();
}