Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
namespaces, sep, method = name.rpartition('.')
handler = self.handler
if namespaces:
for part in namespaces.split('.'):
try:
handler = handler[part]
except KeyError:
raise NotFoundError(name)
else:
if not isinstance(handler, AbstractHandler):
raise NotFoundError(name)
try:
func = handler[method]
except KeyError:
raise NotFoundError(name)
else:
if isinstance(func, MethodType):
holder = func.__func__
else:
holder = func
if not hasattr(holder, '__rpc__'):
raise NotFoundError(name)
return func
def msg_received(self, data):
try:
*pre, header, bname, bargs, bkwargs = data
pid, rnd, req_id, timestamp = self.REQ.unpack(header)
name = bname.decode('utf-8')
args = self.packer.unpackb(bargs)
kwargs = self.packer.unpackb(bkwargs)
except Exception:
logger.critical("Cannot unpack %r", data, exc_info=sys.exc_info())
return
try:
func = self.dispatch(name)
args, kwargs, ret_ann = self.check_args(func, args, kwargs)
except (NotFoundError, ParametersError) as exc:
fut = asyncio.Future(loop=self.loop)
fut.add_done_callback(partial(self.process_call_result,
req_id=req_id, pre=pre,
name=name, args=args, kwargs=kwargs))
fut.set_exception(exc)
else:
if asyncio.iscoroutinefunction(func):
fut = self.add_pending(func(*args, **kwargs))
else:
fut = asyncio.Future(loop=self.loop)
try:
fut.set_result(func(*args, **kwargs))
except Exception as exc:
fut.set_exception(exc)
fut.add_done_callback(partial(self.process_call_result,
req_id=req_id, pre=pre,
def process_call_result(self, fut, *, name, args, kwargs):
self.discard_pending(fut)
try:
if fut.result() is not None:
logger.warning("PubSub handler %r returned not None", name)
except asyncio.CancelledError:
return
except (NotFoundError, ParametersError) as exc:
logger.exception("Call to %r caused error: %r", name, exc)
except Exception:
self.try_log(fut, name, args, kwargs)
def process_call_result(self, fut, *, name, args, kwargs):
self.discard_pending(fut)
try:
if fut.result() is not None:
logger.warning("Pipeline handler %r returned not None", name)
except (NotFoundError, ParametersError) as exc:
logger.exception("Call to %r caused error: %r", name, exc)
except asyncio.CancelledError:
return
except Exception:
self.try_log(fut, name, args, kwargs)
async def RPCContext(addr, timeout=10):
preserved_exceptions = (
NotFoundError,
ParametersError,
asyncio.TimeoutError,
asyncio.CancelledError,
asyncio.InvalidStateError,
)
server = None
try:
server = await aiozmq.rpc.connect_rpc(
connect=addr, error_table={
'concurrent.futures._base.TimeoutError': asyncio.TimeoutError,
})
server.transport.setsockopt(zmq.LINGER, 50)
with _timeout(timeout):
yield server
except Exception:
exc_type, exc, tb = sys.exc_info()
async def RPCContext(addr, timeout=None):
preserved_exceptions = (
NotFoundError,
ParametersError,
asyncio.TimeoutError,
asyncio.CancelledError,
asyncio.InvalidStateError,
)
global agent_peers
peer = agent_peers.get(addr, None)
if peer is None:
peer = await aiozmq.rpc.connect_rpc(
connect=addr, error_table={
'concurrent.futures._base.TimeoutError': asyncio.TimeoutError,
})
peer.transport.setsockopt(zmq.LINGER, 1000)
agent_peers[addr] = peer
try:
with _timeout(timeout):