Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_auth_plain(amqp_connection, event_loop):
auth = PlainAuth(amqp_connection).marshal()
assert auth == PlainAuth(amqp_connection).marshal()
auth_parts = auth.split(b"\x00")
assert auth_parts == [b"", b"guest", b"guest"]
connection = aiormq.Connection(
amqp_connection.url.with_user("foo").with_password("bar"),
loop=event_loop,
)
auth = PlainAuth(connection).marshal()
auth_parts = auth.split(b"\x00")
assert auth_parts == [b"", b"foo", b"bar"]
auth = PlainAuth(connection)
auth.value = b"boo"
assert auth.marshal() == b"boo"
async def test_no_free_channels(amqp_connection: aiormq.Connection):
await asyncio.wait_for(
asyncio.wait(
[
amqp_connection.channel(n + 1)
for n in range(amqp_connection.connection_tune.channel_max)
]
),
timeout=60,
)
with pytest.raises(aiormq.exceptions.ConnectionNotAllowed):
await asyncio.wait_for(amqp_connection.channel(), timeout=5)
async def test_auth_plain(amqp_connection, event_loop):
auth = PlainAuth(amqp_connection).marshal()
assert auth == PlainAuth(amqp_connection).marshal()
auth_parts = auth.split(b"\x00")
assert auth_parts == [b"", b"guest", b"guest"]
connection = aiormq.Connection(
amqp_connection.url.with_user("foo").with_password("bar"),
loop=event_loop,
)
auth = PlainAuth(connection).marshal()
auth_parts = auth.split(b"\x00")
assert auth_parts == [b"", b"foo", b"bar"]
auth = PlainAuth(connection)
auth.value = b"boo"
assert auth.marshal() == b"boo"
assert auth == PlainAuth(amqp_connection).marshal()
auth_parts = auth.split(b"\x00")
assert auth_parts == [b"", b"guest", b"guest"]
connection = aiormq.Connection(
amqp_connection.url.with_user("foo").with_password("bar"),
loop=event_loop,
)
auth = PlainAuth(connection).marshal()
auth_parts = auth.split(b"\x00")
assert auth_parts == [b"", b"foo", b"bar"]
auth = PlainAuth(connection)
auth.value = b"boo"
assert auth.marshal() == b"boo"
async def test_auth_plain(amqp_connection, event_loop):
auth = PlainAuth(amqp_connection).marshal()
assert auth == PlainAuth(amqp_connection).marshal()
auth_parts = auth.split(b"\x00")
assert auth_parts == [b"", b"guest", b"guest"]
connection = aiormq.Connection(
amqp_connection.url.with_user("foo").with_password("bar"),
loop=event_loop,
)
auth = PlainAuth(connection).marshal()
auth_parts = auth.split(b"\x00")
assert auth_parts == [b"", b"foo", b"bar"]
auth = PlainAuth(connection)
auth.value = b"boo"
await channel1.close()
await channel2.close()
channel = await amqp_connection.channel()
queue = asyncio.Queue()
deaclare_ok = await channel.queue_declare(auto_delete=True)
consume_ok = await channel.basic_consume(deaclare_ok.queue, queue.put)
await channel.basic_publish(b"foo", routing_key=deaclare_ok.queue)
message = await queue.get() # type: DeliveredMessage
assert message.body == b"foo"
with pytest.raises(aiormq.exceptions.PublishError) as e:
await channel.basic_publish(
b"bar", routing_key=deaclare_ok.queue + "foo", mandatory=True
)
message = e.value.message
assert message.delivery.routing_key == deaclare_ok.queue + "foo"
assert message.body == b"bar"
assert "'NO_ROUTE' for routing key" in repr(e.value)
cancel_ok = await channel.basic_cancel(consume_ok.consumer_tag)
assert cancel_ok.consumer_tag == consume_ok.consumer_tag
await channel.queue_delete(deaclare_ok.queue)
deaclare_ok = await channel.queue_declare(auto_delete=True)
await channel.basic_publish(b"foo bar", routing_key=deaclare_ok.queue)
async def test_blank_body(amqp_channel: aiormq.Channel):
await amqp_channel.basic_qos(prefetch_count=1)
assert amqp_channel.number
queue = asyncio.Queue()
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
consume_ok = await amqp_channel.basic_consume(deaclare_ok.queue, queue.put)
await amqp_channel.basic_publish(
b"",
routing_key=deaclare_ok.queue,
properties=aiormq.spec.Basic.Properties(message_id="123"),
)
message = await queue.get() # type: DeliveredMessage
assert message.body == b""
cancel_ok = await amqp_channel.basic_cancel(consume_ok.consumer_tag)
assert cancel_ok.consumer_tag == consume_ok.consumer_tag
assert cancel_ok.consumer_tag not in amqp_channel.consumers
await amqp_channel.queue_delete(deaclare_ok.queue)
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
await amqp_channel.basic_publish(b"foo bar", routing_key=deaclare_ok.queue)
message = await amqp_channel.basic_get(deaclare_ok.queue, no_ack=True)
assert message.body == b"foo bar"
async def test_simple(amqp_channel: aiormq.Channel):
await amqp_channel.basic_qos(prefetch_count=1)
assert amqp_channel.number
queue = asyncio.Queue()
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
consume_ok = await amqp_channel.basic_consume(deaclare_ok.queue, queue.put)
await amqp_channel.basic_publish(
b"foo",
routing_key=deaclare_ok.queue,
properties=aiormq.spec.Basic.Properties(message_id="123"),
)
message = await queue.get() # type: DeliveredMessage
assert message.body == b"foo"
cancel_ok = await amqp_channel.basic_cancel(consume_ok.consumer_tag)
assert cancel_ok.consumer_tag == consume_ok.consumer_tag
assert cancel_ok.consumer_tag not in amqp_channel.consumers
await amqp_channel.queue_delete(deaclare_ok.queue)
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
await amqp_channel.basic_publish(b"foo bar", routing_key=deaclare_ok.queue)
message = await amqp_channel.basic_get(deaclare_ok.queue, no_ack=True)
assert message.body == b"foo bar"
async def test_conncetion_reject(event_loop):
with pytest.raises(ConnectionError):
await aiormq.connect(
"amqp://guest:guest@127.0.0.1:59999/", loop=event_loop
)
connection = aiormq.Connection(
"amqp://guest:guest@127.0.0.1:59999/", loop=event_loop
)
with pytest.raises(ConnectionError):
await event_loop.create_task(connection.connect())
async def test_bad_credentials(amqp_connection, event_loop):
connection = aiormq.Connection(
amqp_connection.url.with_password(uuid.uuid4().hex), loop=event_loop
)
with pytest.raises(aiormq.exceptions.ProbableAuthenticationError):
await connection.connect()