Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create():
server = yield from aiozmq.rpc.serve_rpc(
MyHandler(),
bind='tcp://127.0.0.1:{}'.format(port),
loop=self.loop)
client = yield from aiozmq.rpc.connect_rpc(
connect='tcp://127.0.0.1:{}'.format(port),
loop=self.loop)
return client, server
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*')
server_addr = next(iter(server.transport.bindings()))
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.remote_func(1, 2)
assert 3 == ret
server.close()
client.close()
def go():
server = yield from aiozmq.rpc.serve_rpc(
DynamicHandler(), bind='tcp://*:*')
server_addr = next(iter(server.transport.bindings()))
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.func()
assert ((), 'val') == ret, ret
ret = yield from client.call.a.func()
assert (('a',), 'val') == ret, ret
ret = yield from client.call.a.b.func()
assert (('a', 'b'), 'val') == ret, ret
server.close()
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*')
server_addr = next(iter(server.transport.bindings()))
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
try:
yield from client.call.unknown_function()
except aiozmq.rpc.NotFoundError as exc:
print("client.rpc.unknown_function(): {}".format(exc))
try:
yield from client.call.remote_func(bad_arg=1)
except aiozmq.rpc.ParametersError as exc:
print("client.rpc.remote_func(bad_arg=1): {}".format(exc))
def align_server(index, port, aligner, opts=''):
bind = 'tcp://127.0.0.1:{}'.format(port)
if aligner == 'minimap':
server = yield from rpc.serve_rpc(
MiniMapServe(index[0], map_opts=opts), bind=bind
)
else:
raise ValueError("Unknown aligner '{}'.".format(aligner))
return server
def go():
server = yield from aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://*:*',
translation_table=translation_table)
server_addr = next(iter(server.transport.bindings()))
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr,
translation_table=translation_table)
ret = yield from client.call.remote(Point(1, 2))
assert ret == Point(1, 2)
server.close()
client.close()
def go():
server = yield from aiozmq.rpc.serve_rpc(
Handler('ident'), bind='tcp://*:*')
server_addr = next(iter(server.transport.bindings()))
client = yield from aiozmq.rpc.connect_rpc(
connect=server_addr)
ret = yield from client.call.a()
assert ('ident', 'a') == ret
ret = yield from client.call.subhandler.b()
assert ('ident', 'subident', 'b') == ret
server.close()
client.close()