Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
function subscribeToOrderbookUncached(horizonURL: string, sellingAsset: string, buyingAsset: string) {
const buying = parseAssetID(buyingAsset)
const selling = parseAssetID(sellingAsset)
const query = createOrderbookQuery(selling, buying)
if (selling.equals(buying)) {
return Observable.from([createEmptyOrderbookRecord(buying, buying)])
}
const createURL = () => String(new URL(`/order_book?${qs.stringify({ ...query, cursor: "now" })}`, horizonURL))
const fetchUpdate = () => fetchOrderbookRecord(horizonURL, sellingAsset, buyingAsset)
let latestKnownSnapshot = ""
// TODO: Optimize - Make UpdateT = ValueT & { [$snapshot]: string }
return subscribeToUpdatesAndPoll({
async applyUpdate(update) {
latestKnownSnapshot = JSON.stringify(update)
return update
},
fetchUpdate,
async init() {
observe(selector: SelectorT, observe: () => ObservableLike) {
const cacheKey = createCacheKey(selector)
const cached = observables.get(cacheKey)
if (cached) {
return cached
} else {
// Multicast it, so we re-use the existing subscription
// instead of setting up listeners over and over again
const multicasted = multicast(observe())
// TODO: Check if this value is actually newer than the old one
observables.set(cacheKey, multicasted)
return multicasted
}
}
}
expose(function countToFive() {
return new Observable(observer => {
for (let counter = 1; counter <= 5; counter++) {
observer.next(counter)
}
observer.complete()
})
})
function createEventObservable(worker: WorkerType, workerTermination: Promise): Observable {
return new Observable(observer => {
const messageHandler = ((messageEvent: MessageEvent) => {
const workerEvent: WorkerMessageEvent = {
type: WorkerEventType.message,
data: messageEvent.data
}
observer.next(workerEvent)
}) as EventListener
const rejectionHandler = ((errorEvent: PromiseRejectionEvent) => {
debugThreadUtils("Unhandled promise rejection event in thread:", errorEvent)
const workerEvent: WorkerInternalErrorEvent = {
type: WorkerEventType.internalError,
error: Error(errorEvent.reason)
}
observer.next(workerEvent)
}) as EventListener
worker.addEventListener("message", messageHandler)
function createObservableForJob(worker: WorkerType, jobUID: number): Observable {
return new Observable(observer => {
let asyncType: "observable" | "promise" | undefined
const messageHandler = ((event: MessageEvent) => {
debugMessages("Message from worker:", event.data)
if (!event.data || event.data.uid !== jobUID) return
if (isJobStartMessage(event.data)) {
asyncType = event.data.resultType
} else if (isJobResultMessage(event.data)) {
if (asyncType === "promise") {
if (typeof event.data.payload !== "undefined") {
observer.next(event.data.payload)
}
observer.complete()
worker.removeEventListener("message", messageHandler)
} else {
export function subscribeToUpdatesAndPoll(
implementation: SubscriberImplementation,
options?: { retryFetchOnNoUpdate: boolean }
): Observable {
const { retryFetchOnNoUpdate = true } = options || {}
return multicast(
new Observable(observer => {
let cancelled = false
let unsubscribe = () => {
cancelled = true
}
const handleUnexpectedError = (error: Error) => {
try {
if (implementation.handleError) {
implementation.handleError(error)
} else {
throw error
}
} catch (error) {
observer.error(error)
}
return ((...rawArgs: Args) => {
const uid = nextJobUID++
const { args, transferables } = prepareArguments(rawArgs)
const runMessage: MasterJobRunMessage = {
type: MasterMessageType.run,
uid,
method,
args
}
debugMessages("Sending command to run function to worker:", runMessage)
worker.postMessage(runMessage, transferables)
return ObservablePromise.from(multicast(createObservableForJob(worker, uid)))
}) as any as ProxyableFunction
}
export function subscribeToUpdatesAndPoll(
implementation: SubscriberImplementation,
options?: { retryFetchOnNoUpdate: boolean }
): Observable {
const { retryFetchOnNoUpdate = true } = options || {}
return multicast(
new Observable(observer => {
let cancelled = false
let unsubscribe = () => {
cancelled = true
}
const handleUnexpectedError = (error: Error) => {
try {
if (implementation.handleError) {
implementation.handleError(error)
} else {
throw error
}
} catch (error) {
observer.error(error)
constructor(
spawnWorker: () => Promise,
optionsOrSize?: number | PoolOptions
) {
const options: PoolOptions = typeof optionsOrSize === "number"
? { size: optionsOrSize }
: optionsOrSize || {}
const { size = defaultPoolSize } = options
this.debug = DebugLogger(`threads:pool:${slugify(options.name || String(nextPoolID++))}`)
this.options = options
this.workers = spawnWorkers(spawnWorker, size)
this.eventObservable = multicast(Observable.from(this.eventSubject))
Promise.all(this.workers.map(worker => worker.init)).then(
() => this.eventSubject.next({
type: PoolEventType.initialized,
size: this.workers.length
}),
error => {
this.debug("Error while initializing pool worker:", error)
this.eventSubject.error(error)
this.initErrors.push(error)
}
)
}
}).flatMap((txs: Horizon.TransactionResponse[]) => Observable.from(txs))
}