Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_coro(self, event_loop):
async def foo():
await asyncio.sleep(0)
return 42
bar = LazyCoroutine(foo)
assert await bar == 42
) -> DrainResult:
if not self.connection.basic_nack:
raise exc.MethodNotImplemented
self.writer.write(
pamqp.frame.marshal(
spec.Basic.Nack(
delivery_tag=delivery_tag,
multiple=multiple,
requeue=requeue,
),
self.number,
)
)
return LazyCoroutine(self.connection.drain)
def basic_reject(self, delivery_tag, *, requeue=True) -> DrainResult:
self.writer.write(
pamqp.frame.marshal(
spec.Basic.Reject(delivery_tag=delivery_tag, requeue=requeue),
self.number,
)
)
return LazyCoroutine(self.connection.drain)
def basic_ack(self, delivery_tag, multiple=False) -> DrainResult:
self.writer.write(
pamqp.frame.marshal(
spec.Basic.Ack(delivery_tag=delivery_tag, multiple=multiple),
self.number,
)
)
return LazyCoroutine(self.connection.drain)