Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return new Promise((resolve, reject) => {
const client = new RSocketClient({
serializers: JsonSerializers,
setup: {
dataMimeType: 'application/json',
keepAlive: 100000,
lifetime: 100000,
metadataMimeType: 'application/json',
},
transport: new RSocketWebSocketClient({ url }),
});
client.connect().subscribe({
onComplete: (socket: any) => {
// console.log('Connected to ' + url);
resolve(socket);
},
onError: (error: any) => {
// console.log('Err', error);
public start(opts: GatewayStartOptions) {
if (this.started) {
this.warn('Gateway is already started');
return;
}
const { serviceCall } = opts;
validateServiceCall(serviceCall);
this.server = new RSocketServer({
serializers: JsonSerializers,
getRequestHandler: (socket) => {
return {
requestResponse: (payload: RsocketEventsPayload) =>
requestResponse(payload, serviceCall, this.requestResponse),
requestStream: (payload: RsocketEventsPayload) => requestStream(payload, serviceCall, this.requestStream),
};
},
transport: this.transport,
});
this.server.start();
// console.log('Gateway started on port: ' + this.port);
this.started = true;
}
async function run(options) {
const serverOptions = {
host: options.host,
port: options.port,
};
if (!isClient) {
const deferred = new Deferred();
const server = new RSocketServer({
getRequestHandler: socket => {
runOperation(socket, options);
return new SymmetricResponder();
},
transport: getServerTransport(options.protocol, serverOptions),
});
server.start();
console.log(`Server started on ${options.host}:${options.port}`);
return deferred.getPromise();
} else {
console.log(`Client connecting to ${options.host}:${options.port}`);
// $FlowFixMe
const socket: ReactiveSocket = await connect(
options.protocol,
serverOptions,
async function connect(options: Options): Promise> {
const client = new RSocketClient({
setup: {
dataMimeType: 'text/plain',
keepAlive: 1000000, // avoid sending during test
lifetime: 100000,
metadataMimeType: 'text/plain',
},
transport: new RSocketTcpClient({
host: options.host,
port: options.port,
}),
});
return new Promise((resolve, reject) => {
client.connect().subscribe({
onComplete: resolve,
onError: reject,
});
function connect(protocol: string, options: ServerOptions) {
const client = new RSocketClient({
setup: {
dataMimeType: 'text/plain',
keepAlive: 1000000, // avoid sending during test
lifetime: 100000,
metadataMimeType: 'text/plain',
},
responder: new SymmetricResponder(),
transport: getClientTransport(protocol, options),
});
return client.connect();
}
TEXT_PLAIN,
MESSAGE_RSOCKET_COMPOSITE_METADATA,
MESSAGE_RSOCKET_ROUTING,
} from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
import WebSocket from 'ws';
const maxRSocketRequestN = 2147483647;
const host = '127.0.0.1';
const port = 7000;
const keepAlive = 60000;
const lifetime = 180000;
const dataMimeType = 'application/octet-stream';
const metadataMimeType = MESSAGE_RSOCKET_COMPOSITE_METADATA.string;
const client = new RSocketClient({
setup: {
keepAlive,
lifetime,
dataMimeType,
metadataMimeType,
},
transport: new RSocketWebSocketClient(
{wsCreator: () => new WebSocket('ws://localhost:7000'), debug: true},
BufferEncoders,
),
});
// Open the connection
client.connect().then(socket => {
socket
.requestStream({
constructor(url, responder) {
this.client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: JsonMetadataSerializer,
},
setup: {
// ms btw sending keepalive to server
keepAlive: 10000,
// ms timeout if no keepalive response
lifetime: 20000,
dataMimeType: 'application/json',
metadataMimeType: JsonMetadataSerializer.MIME_TYPE,
},
transport: new RSocketWebSocketClient({url: url}),
responder: responder
});
}
initRsocketWebSocket() {
// Create an instance of a client
const client = new RSocketClient({
//serializers: JsonSerializers,
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// // format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'x.rsocket.routing.v0',
},
transport: new RSocketWebSocketClient({url: 'ws://localhost:8088/rsocket'}),
});
// Open the connection
client.connect().subscribe({
export function encodeMetadata(
service: string,
method: string,
tracing: Encodable,
metadata: Encodable,
): Buffer {
const serviceLength = UTF8Encoder.byteLength(service);
const methodLength = UTF8Encoder.byteLength(method);
const metadataLength = BufferEncoder.byteLength(metadata);
// We can't overload the method call directly and the code generator currently only populates
// the first 3 parameters
if (undefined === tracing) {
tracing = createBuffer(0);
}
const tracingLength = BufferEncoder.byteLength(tracing);
const buffer = createBuffer(
VERSION_SIZE +
SERVICE_LENGTH_SIZE +
serviceLength +
METHOD_LENGTH_SIZE +
methodLength +
TRACING_LENGTH_SIZE +
tracingLength +
metadataLength,
);
let offset = buffer.writeUInt16BE(VERSION, 0);
service: string,
method: string,
tracing: Encodable,
metadata: Encodable,
): Buffer {
const serviceLength = UTF8Encoder.byteLength(service);
const methodLength = UTF8Encoder.byteLength(method);
const metadataLength = BufferEncoder.byteLength(metadata);
// We can't overload the method call directly and the code generator currently only populates
// the first 3 parameters
if (undefined === tracing) {
tracing = createBuffer(0);
}
const tracingLength = BufferEncoder.byteLength(tracing);
const buffer = createBuffer(
VERSION_SIZE +
SERVICE_LENGTH_SIZE +
serviceLength +
METHOD_LENGTH_SIZE +
methodLength +
TRACING_LENGTH_SIZE +
tracingLength +
metadataLength,
);
let offset = buffer.writeUInt16BE(VERSION, 0);
offset = buffer.writeUInt16BE(serviceLength, offset);
offset = UTF8Encoder.encode(service, buffer, offset, offset + serviceLength);
offset = buffer.writeUInt16BE(methodLength, offset);