Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
task = task._coro
await asyncio.wait([asyncio.ensure_future(task)], timeout=3)
except Exception:
pass
cls.create_client(cls, 'sqs', context)
if retry >= 3:
raise e
continue
except botocore.exceptions.ClientError as e:
error_message = str(e)
logging.getLogger('transport.aws_sns_sqs').warning('Unable to delete message [sqs] on AWS ({})'.format(error_message))
except asyncio.TimeoutError as e:
if retry >= 3:
error_message = 'Network timeout'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to delete message [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
continue
try:
task = client.close()
if getattr(task, '_coro', None):
task = task._coro
await asyncio.wait([asyncio.ensure_future(task)], timeout=3)
except Exception:
pass
cls.create_client(cls, 'sns', context)
if retry >= 3:
raise e
continue
except (botocore.exceptions.ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError) as e:
if retry >= 3:
error_message = str(e) if not isinstance(e, asyncio.TimeoutError) else 'Network timeout'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to publish message [sns] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
continue
break
message_id = response.get('MessageId')
if not message_id or not isinstance(message_id, str):
error_message = 'Missing MessageId in response'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to publish message [sns] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level'))
return message_id
try:
response = await client.create_queue(QueueName=queue_name)
except (botocore.exceptions.NoCredentialsError, botocore.exceptions.PartialCredentialsError, aiohttp.client_exceptions.ClientOSError) as e:
error_message = str(e)
logging.getLogger('transport.aws_sns_sqs').warning('Unable to connect [sqs] to AWS ({})'.format(error_message))
raise AWSSNSSQSConnectionException(error_message, log_level=context.get('log_level')) from e
except botocore.exceptions.ClientError as e:
error_message = str(e)
logging.getLogger('transport.aws_sns_sqs').warning('Unable to create queue [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
queue_url = response.get('QueueUrl')
if not queue_url:
error_message = 'Missing Queue URL in response'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to create queue [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level'))
try:
response = await client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])
except botocore.exceptions.ClientError as e:
error_message = str(e)
logging.getLogger('transport.aws_sns_sqs').warning('Unable to get queue attributes [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
queue_arn = response.get('Attributes', {}).get('QueueArn')
if not queue_arn:
error_message = 'Missing ARN in response'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to get queue attributes [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level'))
return queue_url, queue_arn
# MessageRetentionPeriod (default 4 days, set to context value)
# VisibilityTimeout (default 30 seconds)
response = await sqs_client.set_queue_attributes(QueueUrl=queue_url, Attributes={'Policy': json.dumps(queue_policy)})
except botocore.exceptions.ClientError as e:
error_message = str(e)
logging.getLogger('transport.aws_sns_sqs').warning('Unable to set queue attributes [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
subscription_arn_list = []
for topic_arn in topic_arn_list:
try:
response = await client.subscribe(TopicArn=topic_arn, Protocol='sqs', Endpoint=queue_arn)
except botocore.exceptions.ClientError as e:
error_message = str(e)
logging.getLogger('transport.aws_sns_sqs').warning('Unable to subscribe to topic [sns] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
subscription_arn = response.get('SubscriptionArn')
if not subscription_arn:
error_message = 'Missing Subscription ARN in response'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to subscribe to topic [sns] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level'))
subscription_arn_list.append(subscription_arn)
return subscription_arn_list
if retry >= 3:
raise e
continue
except (botocore.exceptions.ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError) as e:
if retry >= 3:
error_message = str(e) if not isinstance(e, asyncio.TimeoutError) else 'Network timeout'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to publish message [sns] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
continue
break
message_id = response.get('MessageId')
if not message_id or not isinstance(message_id, str):
error_message = 'Missing MessageId in response'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to publish message [sns] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level'))
return message_id
from tomodachi.helpers.dict import merge_dicts
from tomodachi.helpers.middleware import execute_middlewares
from tomodachi.invoker import Invoker
DRAIN_MESSAGE_PAYLOAD = '__TOMODACHI_DRAIN__cdab4416-1727-4603-87c9-0ff8dddf1f22__'
MESSAGE_PROTOCOL_DEFAULT = 'e6fb6007-cf15-4cfd-af2e-1d1683374e70'
MESSAGE_TOPIC_PREFIX = '09698c75-832b-470f-8e05-96d2dd8c4853'
class AWSSNSSQSException(Exception):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self._log_level = kwargs.get('log_level') if kwargs and kwargs.get('log_level') else 'INFO'
class AWSSNSSQSConnectionException(AWSSNSSQSException):
pass
class AWSSNSSQSInternalServiceError(AWSSNSSQSException):
pass
class AWSSNSSQSInternalServiceErrorException(AWSSNSSQSInternalServiceError):
pass
class AWSSNSSQSInternalServiceException(AWSSNSSQSInternalServiceError):
pass
class AWSSNSSQSTransport(Invoker):
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
subscription_arn_list = []
for topic_arn in topic_arn_list:
try:
response = await client.subscribe(TopicArn=topic_arn, Protocol='sqs', Endpoint=queue_arn)
except botocore.exceptions.ClientError as e:
error_message = str(e)
logging.getLogger('transport.aws_sns_sqs').warning('Unable to subscribe to topic [sns] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
subscription_arn = response.get('SubscriptionArn')
if not subscription_arn:
error_message = 'Missing Subscription ARN in response'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to subscribe to topic [sns] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level'))
subscription_arn_list.append(subscription_arn)
return subscription_arn_list
DRAIN_MESSAGE_PAYLOAD = '__TOMODACHI_DRAIN__cdab4416-1727-4603-87c9-0ff8dddf1f22__'
MESSAGE_PROTOCOL_DEFAULT = 'e6fb6007-cf15-4cfd-af2e-1d1683374e70'
MESSAGE_TOPIC_PREFIX = '09698c75-832b-470f-8e05-96d2dd8c4853'
class AWSSNSSQSException(Exception):
def __init__(self, *args: Any, **kwargs: Any) -> None:
self._log_level = kwargs.get('log_level') if kwargs and kwargs.get('log_level') else 'INFO'
class AWSSNSSQSConnectionException(AWSSNSSQSException):
pass
class AWSSNSSQSInternalServiceError(AWSSNSSQSException):
pass
class AWSSNSSQSInternalServiceErrorException(AWSSNSSQSInternalServiceError):
pass
class AWSSNSSQSInternalServiceException(AWSSNSSQSInternalServiceError):
pass
class AWSSNSSQSTransport(Invoker):
clients = None
clients_creation_time = None
topics = {} # type: Dict[str, str]
close_waiter = None
async def setup_queue(topic: str, func: Callable, queue_name: Optional[str] = None, competing_consumer: Optional[bool] = None) -> str:
_uuid = obj.uuid
if queue_name and competing_consumer is False:
raise AWSSNSSQSException('Queue with predefined queue name must be competing', log_level=context.get('log_level'))
if queue_name is None:
queue_name = cls.get_queue_name(cls.encode_topic(topic), func.__name__, _uuid, competing_consumer, context)
else:
queue_name = cls.prefix_queue_name(queue_name, context)
queue_url, queue_arn = await cls.create_queue(cls, queue_name, context) # type: str, str
if re.search(r'([*#])', topic):
await cls.subscribe_wildcard_topic(cls, topic, queue_arn, queue_url, context)
else:
topic_arn = await cls.create_topic(cls, topic, context)
await cls.subscribe_topics(cls, (topic_arn,), queue_arn, queue_url, context)
return queue_url
error_message = 'Missing Queue URL in response'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to create queue [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level'))
try:
response = await client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])
except botocore.exceptions.ClientError as e:
error_message = str(e)
logging.getLogger('transport.aws_sns_sqs').warning('Unable to get queue attributes [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level')) from e
queue_arn = response.get('Attributes', {}).get('QueueArn')
if not queue_arn:
error_message = 'Missing ARN in response'
logging.getLogger('transport.aws_sns_sqs').warning('Unable to get queue attributes [sqs] on AWS ({})'.format(error_message))
raise AWSSNSSQSException(error_message, log_level=context.get('log_level'))
return queue_url, queue_arn