Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_awaitable(func, result, event_loop):
assert await awaitable(func)() == result
queue = await self.channel.declare_queue(method_name, **kwargs)
if func in self.consumer_tags:
raise RuntimeError('Function already registered')
if method_name in self.routes:
raise RuntimeError(
'Method name already used for %r' % self.routes[method_name]
)
self.consumer_tags[func] = await queue.consume(
partial(self.on_call_message, method_name)
)
self.routes[method_name] = awaitable(func)
self.queues[func] = queue
*,
no_ack: bool = False,
exclusive: bool = False,
arguments: ArgumentsType = None,
consumer_tag: str = None
) -> spec.Basic.ConsumeOk:
consumer_tag = consumer_tag or "ctag%i.%s" % (
self.number,
hexlify(os.urandom(16)).decode(),
)
if consumer_tag in self.consumers:
raise exc.DuplicateConsumerTag(self.number)
self.consumers[consumer_tag] = awaitable(consumer_callback)
# noinspection PyTypeChecker
return await self.rpc(
spec.Basic.Consume(
queue=queue,
no_ack=no_ack,
exclusive=exclusive,
consumer_tag=consumer_tag,
arguments=arguments,
)
async def create_worker(self, channel_name: str,
func: Callable, **kwargs) -> Worker:
""" Creates a new :class:`Worker` instance. """
queue = await self.create_queue(channel_name, **kwargs)
if hasattr(func, "_is_coroutine"):
fn = func
else:
fn = awaitable(func)
consumer_tag = await queue.consume(partial(self.on_message, fn))
return Worker(queue, consumer_tag, self.loop)