Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
constructor(f: () => Stream<a>, source: Stream</a><a>, sink: Sink</a><a>, scheduler: Scheduler) {
super(sink)
this.f = f
this.scheduler = scheduler
this.active = true
this.disposable = disposeOnce(source.run(this, scheduler))
}
</a>
run(sink: Sink<a>, scheduler: Scheduler): Disposable {
const n = this.add(sink)
if (n === 1) {
this.disposable = this.source.run(this, scheduler)
}
return disposeOnce(new MulticastDisposable(this, sink))
}
</a>
constructor(f: (a: A) => Stream<b>, concurrency: number, source: Stream<a>, sink: Sink<b>, scheduler: Scheduler) {
this.f = f
this.concurrency = concurrency
this.sink = sink
this.scheduler = scheduler
this.pending = []
this.current = []
this.disposable = disposeOnce(source.run(this, scheduler))
this.active = true
}
</b></a></b>