Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@aiozmq.rpc.method
def test2() -> 'bad annotation':
pass
@aiozmq.rpc.method
@asyncio.coroutine
def custom_annotation(self, arg: my_checker):
return arg
yield
@aiozmq.rpc.method
def remote(self, val):
return val
@aiozmq.rpc.method
def remote(self, val):
raise CustomError(val)
@aiozmq.rpc.method
def handle_some_event(self, a: int, b: int):
pass
@aiozmq.rpc.method
def remote_func(self, a: int, b: int) -> int:
return a + b
@rpc.method
def time_saved(self):
"""Difference between current real time and simulated time. (How far
we've fast-forwarded the channel.)
"""
return sum(chan.time_saved for chan in self.replay_channels.values())
@aiozmq.rpc.method
async def shutdown_agent(self, terminate_kernels: bool):
# TODO: implement
log.debug('rpc::shutdown_agent()')
pass
@rpc.method
def get_raw(self, channel, seconds=0.5):
"""Return raw data from the start of the current read.
:param channel: channel for which to get raw data.
:returns: Serialized raw data, see :class:`Fast5Data`.
"""
return self.replay_channels[channel].get_raw(seconds=seconds)
@aiozmq.rpc.method
def a():
return 'a'