Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
observe(selector: SelectorT, observe: () => ObservableLike) {
const cacheKey = createCacheKey(selector)
const cached = observables.get(cacheKey)
if (cached) {
return cached
} else {
// Multicast it, so we re-use the existing subscription
// instead of setting up listeners over and over again
const multicasted = multicast(observe())
// TODO: Check if this value is actually newer than the old one
observables.set(cacheKey, multicasted)
return multicasted
}
}
}
return ((...rawArgs: Args) => {
const uid = nextJobUID++
const { args, transferables } = prepareArguments(rawArgs)
const runMessage: MasterJobRunMessage = {
type: MasterMessageType.run,
uid,
method,
args
}
debugMessages("Sending command to run function to worker:", runMessage)
worker.postMessage(runMessage, transferables)
return ObservablePromise.from(multicast(createObservableForJob(worker, uid)))
}) as any as ProxyableFunction
}
export function subscribeToUpdatesAndPoll(
implementation: SubscriberImplementation,
options?: { retryFetchOnNoUpdate: boolean }
): Observable {
const { retryFetchOnNoUpdate = true } = options || {}
return multicast(
new Observable(observer => {
let cancelled = false
let unsubscribe = () => {
cancelled = true
}
const handleUnexpectedError = (error: Error) => {
try {
if (implementation.handleError) {
implementation.handleError(error)
} else {
throw error
}
} catch (error) {
observer.error(error)