Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
MemoryStream,
Subscription
} from 'xstream';
const producer = {
start: (listener) => {
listener.next(1);
listener.next(2);
listener.next(3);
listener.complete();
},
stop: console.log
};
const create: Stream = xs.create(producer);
const createWithMemory: MemoryStream = xs.createWithMemory(producer);
const never: Stream<*> = xs.never();
const empty: Stream<*> = xs.empty();
const _throw: Stream<*> = xs.throw(new Error(123));
const from1: Stream = xs.from([1]);
const from2: Stream = xs.from(Promise.resolve(1));
const of: Stream = xs.of(1);
const fromArray: Stream = xs.fromArray([1,2,3]);
const fromPromise: Stream = xs.from(Promise.resolve(1));
const periodic: Stream = xs.periodic(123);
const merge: Stream = xs.merge(of, of);
const merge2: Stream = xs.merge(of, of, of, of);
const combine: Stream = xs.combine(of, of);
const combine2: Stream = xs.combine(of, of, of, of);
const listener = {
next: console.log,
const clientBundle$ = (() => {
const bundle$ = xs.createWithMemory();
let bundleString = '';
const bundleStream = browserify()
.transform('babelify')
.transform({global: true}, 'uglifyify')
.add('./client.js')
.bundle();
bundleStream.on('data', function (data) {
bundleString += data;
});
bundleStream.on('end', function () {
bundle$.shamefullySendNext(bundleString);
console.log('Client bundle successfully compiled.');
});
return bundle$;
})();
function restartableDriver (sink$, Time) {
const filteredSink$ = xs.create();
const lastSinkEvent$ = xs.createWithMemory();
if (sink$) {
if (isReplaying() && replayOnlyLastSink) {
lastSinkEvent$.map(lastEvent => finishedReplay$.mapTo(lastEvent)).flatten().take(1).addListener(subscribe((event) => {
filteredSink$.shamefullySendNext(event);
}));
}
if (pauseSinksWhileReplaying) {
sink$.compose(pausable(pause$)).filter(() => !isReplaying()).addListener({
next: (ev) => filteredSink$.shamefullySendNext(ev),
error: (err) => console.error(err),
complete: () => {}
});
} else {
sink$.compose(pausable(pause$)).addListener(subscribe((ev) => filteredSink$.shamefullySendNext(ev)));
return function(event = false) {
const stream$ = xs.createWithMemory({
eventListener: null,
start(listener) {
const onMessage = args => {
listener.next(JSON.parse(args));
};
if (!event) {
this.eventListener = client.onMessage(onMessage);
return;
}
client.send(
JSON.stringify({ event: 'listen', params: { chan: event } })
);
this.channel = client.channel(event);
this.eventListener = this.channel.onMessage(onMessage);
this.rosConnectionHook = () => {
if (this.isStreamLive) {
return;
}
this.stream = this.hasMemory
? xs.createWithMemory(this.producer)
: xs.create(this.producer);
this.isStreamLive = true;
this.listeners.forEach(listener => {
this.stream.addListener(listener);
});
};
this.ros.on('connection', this.rosConnectionHook);
function requestToResponse(
requestOptions: RequestOptions
): MemoryStream {
let request: superagent.Request;
return xs.createWithMemory({
start(listener) {
request = optionsToSuperagent(requestOptions);
listener.next(Loading(0));
if (requestOptions.progress) {
request = request.on('progress', ev =>
listener.next(Loading(ev.percent || 0))
);
}
request.end(processResponse(requestOptions, listener));
},
stop() {
request.abort();
return function record(stream: Stream): Stream {
const recordedStream = xs.createWithMemory({
start(listener) {
xs.fromObservable(stream).addListener(
recordListener(currentTime, listener)
);
},
stop() {},
});
return adapt(recordedStream);
};
}
function socialValue$(id, key, defaultValue) {
if (!ref.isLink(id)) throw new Error('About requires an ssb ref!');
return xs.createWithMemory({
start(listener) {
listener.next(defaultValue);
this.sink = pull.drain(listener.next.bind(listener));
pull(api.sbot.pull.aboutSocialValueStream({key, dest: id}), this.sink);
},
stop() {
if (this.sink) this.sink.abort(true);
},
});
}
};