Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_publish_invalid_credentials(monkeypatch: Any, capsys: Any, loop: Any) -> None:
services, future = start_service('tests/services/dummy_service.py', monkeypatch)
instance = services.get('test_dummy')
with pytest.raises(AmqpException):
loop.run_until_complete(AmqpTransport.publish(instance, 'data', 'test.topic', wait=True))
async def _async_kill():
os.kill(os.getpid(), signal.SIGINT)
loop.create_task(_async_kill())
loop.run_until_complete(future)
out, err = capsys.readouterr()
assert 'Unable to connect [amqp] to 127.0.0.1:54321' in err
assert out == ''
pass
class AmqpInternalServiceException(AmqpInternalServiceError):
pass
class AmqpExclusiveQueueLockedException(AmqpException):
pass
class AmqpTooManyConsumersException(AmqpException):
pass
class AmqpConnectionException(AmqpException):
pass
class AmqpChannelClosed(AmqpException):
pass
class AmqpTransport(Invoker):
channel = None # type: Any
protocol = None # type: Any
transport = None # type: Any
@classmethod
async def publish(cls, service: Any, data: Any, routing_key: str = '', exchange_name: str = '', wait: bool = True, message_protocol: Any = MESSAGE_PROTOCOL_DEFAULT, routing_key_prefix: Optional[str] = MESSAGE_ROUTING_KEY_PREFIX, **kwargs: Any) -> None:
if not cls.channel:
await cls.connect(cls, service, service.context)
self._log_level = kwargs.get('log_level') if kwargs and kwargs.get('log_level') else 'INFO'
class AmqpInternalServiceError(AmqpException):
pass
class AmqpInternalServiceErrorException(AmqpInternalServiceError):
pass
class AmqpInternalServiceException(AmqpInternalServiceError):
pass
class AmqpExclusiveQueueLockedException(AmqpException):
pass
class AmqpTooManyConsumersException(AmqpException):
pass
class AmqpConnectionException(AmqpException):
pass
class AmqpChannelClosed(AmqpException):
pass
class AmqpTransport(Invoker):
pass
class AmqpInternalServiceErrorException(AmqpInternalServiceError):
pass
class AmqpInternalServiceException(AmqpInternalServiceError):
pass
class AmqpExclusiveQueueLockedException(AmqpException):
pass
class AmqpTooManyConsumersException(AmqpException):
pass
class AmqpConnectionException(AmqpException):
pass
class AmqpChannelClosed(AmqpException):
pass
class AmqpTransport(Invoker):
channel = None # type: Any
protocol = None # type: Any
transport = None # type: Any
import aioamqp
from tomodachi.helpers.dict import merge_dicts
from tomodachi.helpers.middleware import execute_middlewares
from tomodachi.invoker import Invoker
MESSAGE_PROTOCOL_DEFAULT = '2594418c-5771-454a-a7f9-8f83ae82812a'
MESSAGE_ROUTING_KEY_PREFIX = '38f58822-25f6-458a-985c-52701d40dbbc'
class AmqpException(Exception):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self._log_level = kwargs.get('log_level') if kwargs and kwargs.get('log_level') else 'INFO'
class AmqpInternalServiceError(AmqpException):
pass
class AmqpInternalServiceErrorException(AmqpInternalServiceError):
pass
class AmqpInternalServiceException(AmqpInternalServiceError):
pass
class AmqpExclusiveQueueLockedException(AmqpException):
pass
class AmqpTooManyConsumersException(AmqpException):
pass
class AmqpExclusiveQueueLockedException(AmqpException):
pass
class AmqpTooManyConsumersException(AmqpException):
pass
class AmqpConnectionException(AmqpException):
pass
class AmqpChannelClosed(AmqpException):
pass
class AmqpTransport(Invoker):
channel = None # type: Any
protocol = None # type: Any
transport = None # type: Any
@classmethod
async def publish(cls, service: Any, data: Any, routing_key: str = '', exchange_name: str = '', wait: bool = True, message_protocol: Any = MESSAGE_PROTOCOL_DEFAULT, routing_key_prefix: Optional[str] = MESSAGE_ROUTING_KEY_PREFIX, **kwargs: Any) -> None:
if not cls.channel:
await cls.connect(cls, service, service.context)
exchange_name = exchange_name or cls.exchange_name or 'amq.topic'
message_protocol = getattr(service, 'message_protocol', None) if message_protocol == MESSAGE_PROTOCOL_DEFAULT else message_protocol
queue_name = cls.prefix_queue_name(queue_name, context)
amqp_arguments = {}
ttl = context.get('options', {}).get('amqp', {}).get('queue_ttl', 86400)
if ttl:
amqp_arguments['x-expires'] = int(ttl * 1000)
try:
data = await channel.queue_declare(queue_name, passive=passive, durable=durable, exclusive=exclusive, auto_delete=auto_delete, arguments=amqp_arguments)
if max_consumers is not None and data.get('consumer_count', 0) >= max_consumers:
logging.getLogger('transport.amqp').warning('Max consumers ({}) for queue [amqp] "{}" has been reached'.format(max_consumers, queue_name))
raise AmqpTooManyConsumersException("Max consumers for this queue has been reached")
except aioamqp.exceptions.ChannelClosed as e:
if e.args[0] == 405:
raise AmqpExclusiveQueueLockedException(str(e)) from e
raise AmqpException(str(e)) from e
await channel.queue_bind(queue_name, exchange_name or 'amq.topic', cls.encode_routing_key(cls.get_routing_key(routing_key, context)))
return queue_name