Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_or_create_queue(queue_name):
region_name = get_aws_region_name()
sqs = boto3.resource('sqs', region_name=region_name)
try:
return sqs.get_queue_by_name(QueueName=queue_name)
except ClientError as exc:
non_existent_code = 'AWS.SimpleQueueService.NonExistentQueue'
if exc.response['Error']['Code'] == non_existent_code:
return sqs.create_queue(QueueName=queue_name)
else:
raise
def get_conn(region=None, access_key_id=None, secret_access_key=None):
if not region:
region = get_aws_region_name()
return boto3.client(
"sqs",
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key, region_name=region,
)