Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@aws_sns_sqs('example-pubsub-callback', competing=True)
async def callback(self, data: Any) -> None:
self.log('Received data (function: callback) - "{}"'.format(data))
@aws_sns_sqs('example-pubsub-new-message', competing=True)
async def new_message(self, data: Any) -> None:
self.log('Received data (function: new_message) - "{}"'.format(data))
callback_data = 'message received: "{}"'.format(data)
await aws_sns_sqs_publish(self, callback_data, topic='example-pubsub-callback', wait=True)
@aws_sns_sqs('example-#')
async def wildcard_route(self, metadata: Dict, data: Any) -> None:
self.log('Received data (function: wildcard_route, topic: {}) - "{}"'.format(metadata.get('topic', ''), data))
@aws_sns_sqs('example-route1')
async def route1b(self, data: Any) -> None:
self.log('Received data (function: route1b) - "{}"'.format(data))
@aws_sns_sqs('services-registration-deregister', queue_name='registration-service--deregister', competing=True)
async def deregister(self, data: Dict) -> None:
self.log('Deregister service "{}" [id: {}]'.format(data.get('name'), data.get('uuid')))
@aws_sns_sqs('example-route2')
async def route2(self, data: Any) -> None:
self.log('Received data (function: route2) - "{}"'.format(data))
@aws_sns_sqs('example-route1')
async def route1a(self, data: Any) -> None:
self.log('Received data (function: route1a) - "{}"'.format(data))
@aws_sns_sqs('services-registration-register', queue_name='registration-service--register', competing=True)
async def register(self, data: Dict) -> None:
self.log('Register service "{}" [id: {}]'.format(data.get('name'), data.get('uuid')))