Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@aws_sns_sqs('test-custom-topic', message_protocol=CustomProtocol)
async def test(self, data: Any, protocol: Any, default_value: bool = True) -> None:
if data == self.data_uuid and protocol == 'custom':
self.test_topic_data_received = default_value
self.test_topic_data = data
self.check_closer()
@aws_sns_sqs('test-topic')
async def test(self, data: Any, metadata: Any, service: Any) -> None:
if data == self.data_uuid:
self.test_topic_data_received = True
self.test_topic_metadata_topic = metadata.get('topic')
self.test_topic_service_uuid = service.get('uuid')
self.check_closer()
@aws_sns_sqs('test-topic')
async def test(self, default_value: bool = True) -> None:
self.function_tested = default_value
@aws_sns_sqs('test-topic', ('data',))
async def test(self, data: Any) -> None:
pass
@aws_sns_sqs('test-raw-topic')
async def test(self, value: Any, default_value: bool = True) -> None:
if value == self.data_uuid:
self.test_topic_data_received = default_value
self.test_topic_data = value
self.check_closer()
@aws_sns_sqs('test-topic#', ('metadata', 'data'))
async def faked_wildcard_topic(self, metadata: Any, data: Any) -> None:
pass
@aws_sns_sqs('test-topic#')
async def faked_wildcard_topic(self, metadata: Any, data: Any) -> None:
if data == self.data_uuid:
self.wildcard_topic_data_received = True
self.check_closer()
@aws_sns_sqs('test-topic-unique', queue_name='test-queue-{}'.format(data_uuid))
async def test_specified_queue_name(self, data: Any, metadata: Any, service: Any) -> None:
if data == self.data_uuid:
if self.test_topic_specified_queue_name_data_received:
raise Exception('test_topic_specified_queue_name_data_received already set')
self.test_topic_specified_queue_name_data_received = True
self.check_closer()