Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
run(sink: Sink<b>, scheduler: Scheduler): Disposable {
const l = this.sources.length
const disposables = new Array(l)
const sinks = new Array(l)
const mergeSink = new CombineSink(disposables, sinks.length, sink, this.f)
for (let indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink)
disposables[i] = this.sources[i].run(indexSink, scheduler)
}
return disposeAll(disposables)
}
}</b>
run(sink: Sink, scheduler: Scheduler): Disposable {
const l = this.sources.length
const disposables = new Array(l)
const sinks = new Array(l)
const buffers = new Array(l)
const zipSink = new ZipSink(this.f, buffers, sinks, sink)
for (let indexSink, i = 0; i < l; ++i) {
buffers[i] = new Queue()
indexSink = sinks[i] = new IndexSink(i, zipSink)
disposables[i] = this.sources[i].run(indexSink, scheduler)
}
return disposeAll(disposables)
}
}
run(sink: Sink<a>, scheduler: Scheduler): Disposable {
const l = this.sources.length
const disposables: Disposable[] = new Array(l)
const sinks: Sink</a><a>[] = new Array(l)
const mergeSink = new MergeSink(disposables, sinks, sink)
for (let indexSink, i = 0; i < l; ++i) {
indexSink = sinks[i] = new IndexSink(i, mergeSink)
disposables[i] = this.sources[i].run(indexSink, scheduler)
}
return disposeAll(disposables)
}
}</a>
dispose(): void {
this.active = false
this.pending.length = 0
this.disposable.dispose()
disposeAll(this.current).dispose()
}