Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async function run(options) {
const serverOptions = {
host: options.host,
port: options.port,
};
if (!isClient) {
const deferred = new Deferred();
const server = new RSocketServer({
getRequestHandler: socket => {
runOperation(socket, options);
return new SymmetricResponder();
},
transport: getServerTransport(options.protocol, serverOptions),
});
server.start();
console.log(`Server started on ${options.host}:${options.port}`);
return deferred.getPromise();
} else {
console.log(`Client connecting to ${options.host}:${options.port}`);
// $FlowFixMe
const socket: ReactiveSocket = await connect(
options.protocol,
constructor(socket: ws.Socket, encoders: ?Encoders<*>) {
this._active = true;
this._close = new Deferred();
this._encoders = encoders;
this._socket = socket;
this._statusSubscribers = new Set();
if (socket) {
this._status = CONNECTION_STATUS.CONNECTED;
} else {
this._status = CONNECTION_STATUS.NOT_CONNECTED;
}
// If _receiver has been `subscribe()`-ed already
let isSubscribed = false;
this._receiver = new Flowable(subscriber => {
invariant(
!isSubscribed,
'RSocketWebSocketServer: Multicast receive() is not supported. Be sure ' +
function runOperation(socket, options) {
const deferred = new Deferred();
let subscription: ISubscription;
doOperation(socket, options.operation, options.payload).subscribe({
onComplete() {
console.log('onComplete()');
deferred.resolve();
},
onError(error) {
console.log('onError(%s)', error.message);
deferred.reject(error);
},
onNext(payload) {
console.log('onNext(%s)', payload.data);
},
onSubscribe(_subscription) {
subscription = _subscription;
subscription.request(MAX_STREAM_ID);
awaitN(n: number): Promise {
this._payloadCount = n;
this._payloadDefer = new Deferred();
return this._payloadDefer.getPromise();
}
constructor(log: Function) {
this._cancelled = false;
this._completeDefer = new Deferred();
this._completed = false;
this._errorDefer = new Deferred();
this._errored = false;
this._log = log;
this._payloadCount = null;
this._payloadDefer = null;
this._payloads = [];
this._subscription = null;
}
constructor(log: Function) {
this._cancelled = false;
this._completeDefer = new Deferred();
this._completed = false;
this._errorDefer = new Deferred();
this._errored = false;
this._log = log;
this._payloadCount = null;
this._payloadDefer = null;
this._payloads = [];
this._subscription = null;
}
export function genMockConnection() {
const deferred = new Deferred();
const receiver = genMockPublisher();
const status = genMockPublisher();
let closed = false;
const connection = {
close: jest.fn(() => {
connection.mock.close();
}),
connect: jest.fn(),
connectionStatus: jest.fn(() => status),
onClose: jest.fn(() => {
return deferred.getPromise();
}),
receive: jest.fn(() => receiver),
send: jest.fn(frames => {
connection.send.mock.frames = frames;
constructor(log: Function) {
this._cancelled = false;
this._completeDefer = new Deferred();
this._completed = false;
this._errorDefer = new Deferred();
this._errored = false;
this._log = log;
this._payloadCount = null;
this._payloadDefer = null;
this._payloads = [];
this._subscription = null;
}