Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@amqp('test.topic')
async def test(self, data: Any, metadata: Any, service: Any) -> None:
if data == self.data_uuid:
self.test_topic_data_received = True
self.test_topic_metadata_topic = metadata.get('topic')
self.test_topic_service_uuid = service.get('uuid')
self.check_closer()
@amqp('test.topic', ('data',))
async def test(self, data: Any) -> None:
pass
@amqp('test.raw.topic')
async def test(self, value: Any, default_value: bool = True) -> None:
if value == self.data_uuid:
self.test_topic_data_received = default_value
self.test_topic_data = value
self.check_closer()
@amqp('test.custom.topic', message_protocol=CustomProtocol)
async def test(self, data: Any, protocol: Any, default_value: bool = True) -> None:
if data == self.data_uuid and protocol == 'custom':
self.test_topic_data_received = default_value
self.test_topic_data = data
self.check_closer()
@amqp('test.#', ('metadata', 'data'))
async def wildcard_topic(self, metadata: Any, data: Any) -> None:
pass
@amqp('test.#')
async def wildcard_topic(self, metadata: Any, data: Any) -> None:
if data == self.data_uuid:
self.wildcard_topic_data_received = True
self.check_closer()
@amqp('test.topic', queue_name='test-queue')
async def test_specified_queue_name(self, data: Any, metadata: Any, service: Any) -> None:
if data == self.data_uuid:
if self.test_topic_specified_queue_name_data_received:
raise Exception('test_topic_specified_queue_name_data_received already set')
self.test_topic_specified_queue_name_data_received = True
self.check_closer()