Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def prot():
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
return JSONRPCProtocol()
def protocol(request):
if 'jsonrpc':
return JSONRPCProtocol()
raise RuntimeError('Bad protocol name in test case')
def main(args):
cmd = ["./clef", "--stdio-ui"]
if len(args) > 0 and args[0] == "test":
cmd.extend(["--stdio-ui-test"])
print("cmd: {}".format(" ".join(cmd)))
dispatcher = RPCDispatcher()
dispatcher.register_instance(StdIOHandler(), '')
# line buffered
p = subprocess.Popen(cmd, bufsize=1, universal_newlines=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
rpc_server = RPCServer(
PipeTransport(p.stdout, p.stdin),
JSONRPCProtocol(),
dispatcher
)
rpc_server.serve_forever()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
from tinyrpc.transports.http import HttpPostClientTransport
from tinyrpc import RPCClient
rpc_client = RPCClient(
JSONRPCProtocol(),
HttpPostClientTransport('http://127.0.0.1:5000/')
)
remote_server = rpc_client.get_proxy()
# call a method called 'reverse_string' with a single string argument
result = remote_server.reverse_string('Hello, World!')
print("Server answered:", result)
def __init__(self, ws):
self.ws = ws
self.queue = hub.Queue()
super(WebSocketRPCClient, self).__init__(
JSONRPCProtocol(),
WebSocketClientTransport(ws, self.queue),
)
def __init__(self, *args, **kwargs) -> None:
super(JSONRPCProtocol, self).__init__(*args, **kwargs)
self._id_counter = 0
def __init__(self, ws, rpc_callback):
dispatcher = RPCDispatcher()
dispatcher.register_instance(rpc_callback)
super(WebSocketRPCServer, self).__init__(
WebSocketServerTransport(ws),
JSONRPCProtocol(),
dispatcher,
)
gevent.spawn(handle_client, message)
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
socket.bind("tcp://127.0.0.1:12345")
dispatcher = RPCDispatcher()
@dispatcher.public
def throw_up():
return 'asad'
raise Exception('BLARGH')
rpc_server(socket, JSONRPCProtocol(), dispatcher)
def _to_dict(self):
jdata = {
'jsonrpc': JSONRPCProtocol.JSON_RPC_VERSION,
'method': self.method,
}
if self.args:
jdata['params'] = self.args
if self.kwargs:
jdata['params'] = self.kwargs
if not self.one_way and hasattr(
self, 'unique_id') and self.unique_id is not None:
jdata['id'] = self.unique_id
return jdata