Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
asyncio.get_event_loop call.
Returns Service instance.
"""
if loop is None:
loop = asyncio.get_event_loop()
transp, proto = yield from create_zmq_connection(
lambda: _ServerProtocol(loop, handler,
translation_table=translation_table,
log_exceptions=log_exceptions,
exclude_log_exceptions=exclude_log_exceptions,
timeout=timeout),
zmq.ROUTER, connect=connect, bind=bind, loop=loop)
return Service(loop, proto)
def __init__(self, loop, proto):
super().__init__(loop, proto)
def publish(self, topic):
"""Return object for dynamic PubSub calls.
The usage is:
yield from client.publish('my_topic').ns.func(1, 2)
topic argument may be None otherwise must be isntance of str or bytes
"""
return _MethodCall(self._proto, topic)
class PubSubService(Service):
def subscribe(self, topic):
"""Subscribe to the topic.
topic argument must be str or bytes.
Raises TypeError in other cases
"""
if isinstance(topic, bytes):
btopic = topic
elif isinstance(topic, str):
btopic = topic.encode('utf-8')
else:
raise TypeError('topic should be str or bytes, got {!r}'
.format(topic))
self.transport.subscribe(btopic)
class _ClientProtocol(_BaseProtocol):
def call(self, name, args, kwargs):
if self.transport is None:
raise ServiceClosedError()
bname = name.encode('utf-8')
bargs = self.packer.packb(args)
bkwargs = self.packer.packb(kwargs)
self.transport.write([bname, bargs, bkwargs])
fut = asyncio.Future(loop=self.loop)
fut.set_result(None)
return fut
class PipelineClient(Service):
def __init__(self, loop, proto):
super().__init__(loop, proto)
@property
def notify(self):
"""Return object for dynamic Pipeline calls.
The usage is:
yield from client.pipeline.ns.func(1, 2)
"""
return _MethodCall(self._proto)
class _ServerProtocol(_BaseServerProtocol):
asyncio.get_event_loop() call.
Returns Service instance.
"""
if loop is None:
loop = asyncio.get_event_loop()
trans, proto = yield from create_zmq_connection(
lambda: _ServerProtocol(loop, handler,
translation_table=translation_table,
log_exceptions=log_exceptions,
exclude_log_exceptions=exclude_log_exceptions,
timeout=timeout),
zmq.PULL, connect=connect, bind=bind, loop=loop)
return Service(loop, proto)
btopic = topic.encode('utf-8')
elif isinstance(topic, bytes):
btopic = topic
else:
raise TypeError('topic argument should be None, str or bytes '
'({!r})'.format(topic))
bname = name.encode('utf-8')
bargs = self.packer.packb(args)
bkwargs = self.packer.packb(kwargs)
self.transport.write([btopic, bname, bargs, bkwargs])
fut = asyncio.Future(loop=self.loop)
fut.set_result(None)
return fut
class PubSubClient(Service):
def __init__(self, loop, proto):
super().__init__(loop, proto)
def publish(self, topic):
"""Return object for dynamic PubSub calls.
The usage is:
yield from client.publish('my_topic').ns.func(1, 2)
topic argument may be None otherwise must be isntance of str or bytes
"""
return _MethodCall(self._proto, topic)
class PubSubService(Service):
def call(self, name, args, kwargs):
if self.transport is None:
raise ServiceClosedError()
bname = name.encode('utf-8')
bargs = self.packer.packb(args)
bkwargs = self.packer.packb(kwargs)
header, req_id = self._new_id()
assert req_id not in self.calls, (req_id, self.calls)
fut = asyncio.Future(loop=self.loop)
self.calls[req_id] = fut
self.transport.write([header, bname, bargs, bkwargs])
return fut
class RPCClient(Service):
def __init__(self, loop, proto, *, timeout):
super().__init__(loop, proto)
self._timeout = timeout
@property
def call(self):
"""Return object for dynamic RPC calls.
The usage is:
ret = yield from client.call.ns.func(1, 2)
"""
return _MethodCall(self._proto, timeout=self._timeout)
def with_timeout(self, timeout):
"""Return a new RPCClient instance with overriden timeout"""