Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def bind(
self, exchange: ExchangeType_, routing_key: str = '', *,
arguments: dict = None, timeout: TimeoutType = None
) -> aiormq.spec.Exchange.BindOk:
""" A binding can also be a relationship between two exchanges.
This can be simply read as: this exchange is interested in messages
from another exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion
with a basic_publish parameter we're going to call it a binding key.
.. code-block:: python
client = await connect()
routing_key = 'simple_routing_key'
src_exchange_name = "source_exchange"
dest_exchange_name = "destination_exchange"
async def delete(
self, if_unused: bool = False, timeout: TimeoutType = None
) -> aiormq.spec.Exchange.DeleteOk:
""" Delete the queue
:param timeout: operation timeout
:param if_unused: perform deletion when queue has no bindings.
"""
log.info("Deleting %r", self)
return await asyncio.wait_for(
self.channel.exchange_delete(self.name, if_unused=if_unused),
timeout=timeout
)
async def unbind(
self, exchange: ExchangeType_, routing_key: str = '',
arguments: dict = None, timeout: TimeoutType = None
) -> aiormq.spec.Exchange.UnbindOk:
""" Remove exchange-to-exchange binding for this
:class:`Exchange` instance
:param exchange: :class:`aio_pika.exchange.Exchange` instance
:param routing_key: routing key
:param arguments: additional arguments
:param timeout: execution timeout
:return: :class:`None`
"""
log.debug(
"Unbinding exchange %r from exchange %r, "
"routing_key=%r, arguments=%r",
self, exchange, routing_key, arguments
)
async def declare(
self, timeout: TimeoutType = None
) -> aiormq.spec.Exchange.DeclareOk:
return await asyncio.wait_for(self.channel.exchange_declare(
self.name,
exchange_type=self.__type,
durable=self.durable,
auto_delete=self.auto_delete,
internal=self.internal,
passive=self.passive,
arguments=self.arguments,
), timeout=timeout)