Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _client_consumer(self, channel, client_future, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope() as scope:
self._client_scope = scope
async with channel.new_consumer(queue_name=client_queue_name) \
as data:
task_status.started()
logger.debug('Client consuming messages')
async for body, envelope, properties in data:
logger.debug('Client received message')
client_future.test_result = (body, envelope, properties)
client_future.set()
async def cancel(self):
'''
Attempts to cancel this task.
'''
if self.cancelled:
return
if asynclib.lib_name == "trio":
# This is horrible.
import trio
with trio.open_cancel_scope() as scope:
scope._add_task(self._internal_task)
scope.cancel()
elif asynclib.lib_name == "curio":
# capture the error!
await self._internal_task.cancel()
self.cancelled = True
async def __aexit__(self, *tb):
self.nursery.cancel_scope.cancel()
with trio.open_cancel_scope(shield=True):
try:
if self.conn is not None:
await self.conn.aclose()
except BaseException as exc:
logger.debug("Conn ended", exc_info=exc)
raise
finally:
self.conn = None
self._running = False
async def _reader_loop(self, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope(shield=True) as scope:
self._reader_scope = scope
try:
task_status.started()
while True:
try:
if self._stream is None:
raise exceptions.AmqpClosedConnection
if self.server_heartbeat:
timeout = self.server_heartbeat * 2
else:
timeout = inf
with trio.fail_after(timeout):
try:
frame = await self.get_frame()
async def _writer_loop(self, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope(shield=True) as scope:
self._writer_scope = scope
task_status.started()
while self.state != CLOSED:
if self.server_heartbeat:
timeout = self.server_heartbeat / 2
else:
timeout = inf
with trio.move_on_after(timeout) as timeout_scope:
frame, encoder = await self._send_receive_channel.receive()
if timeout_scope.cancelled_caught:
await self.send_heartbeat()
continue
f = frame.get_frame(encoder)
try:
raise
n = n[:i]
rpc = self.rpcs.get(n + '.#', None)
if rpc is not None:
break
await rpc.run(msg)
except KeyError:
logger.info(
"Unknown message %s %s on %s for %s: %s", mode, envelope.delivery_tag,
envelope.routing_key, routing_key, body
)
await channel.basic_reject(envelope.delivery_tag)
except BaseException:
with trio.open_cancel_scope(shield=True):
with suppress(AmqpClosedConnection):
await channel.basic_reject(envelope.delivery_tag)
raise
k = {}
else:
a = (msg,)
k = {}
if self.call_conv == CC_TASK:
await msg.conn.nursery.start(self._run, self.fn, msg)
else:
try:
res = await coro_wrapper(self.fn, *a, **k)
if res is not None:
await msg.reply(res)
except Exception as exc:
await msg.error(exc, _exit=self.debug)
finally:
with trio.open_cancel_scope(shield=True, deadline=trio.current_time() + 1):
with suppress(AmqpClosedConnection):
await msg.aclose()
async def _run_idle(self, task_status=trio.TASK_STATUS_IGNORED):
"""
Run the "idle proc" under a separate scope
so that it can be cancelled when the connection comes back.
"""
try:
with trio.open_cancel_scope() as s:
self._idle = s
await self.idle_proc()
finally:
self._idle = None
exc = envelope.exchange_name
if exc.startswith("dead"):
exc = properties.headers['x-death'][0]['exchange']
exc = DeadLettered(exc, envelope.routing_key)
if reply_to is None:
# usually, this is no big deal: call debug(), not exception().
logger.debug("Undeliverable one-way message", exc_info=exc)
return
reply.set_error(exc, envelope.routing_key)
reply, props = reply.dump(self, codec=self.codec)
logger.debug("DeadLetter %s to %s", envelope.routing_key, self._ch_reply.exchange)
await self._ch_reply.channel.publish(
reply, self._ch_reply.exchange, reply_to, properties=props
)
finally:
with trio.open_cancel_scope(shield=True, deadline=trio.current_time() + 1):
await channel.basic_client_ack(envelope.delivery_tag)
async def __aexit__(self, *tb):
with trio.open_cancel_scope(shield=True):
try:
await self.channel.basic_cancel(self.consumer_tag)
except AmqpClosedConnection:
pass
del self._chan_send
del self._chan_receive
# these messages are not acknowledged, thus deleting the queue will