Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_first_account_alias_or_account_id():
try:
return boto3.client('iam').list_account_aliases()["AccountAliases"][0]
except IndexError:
return boto3.client('sts').get_caller_identity()["Arn"].split(":")[4]
except ProfileNotFound:
LOGGER.error(
"The AWS_PROFILE env var is set to '{0}' but this profile is not found in your ~/.aws/config".format(
os.environ.get("AWS_PROFILE")))
sys.exit(1)
except (BotoCoreError, ClientError) as e:
LOGGER.error(e)
sys.exit(1)
except Exception as e:
LOGGER.error("Unknown error occurred loading users account alias")
LOGGER.exception(e)
LOGGER.info("Please report at https://github.com/cfn-sphere/cfn-sphere/issues!")
sys.exit(1)
account_id = account["Id"] if isinstance(account, dict) else account
# work out if we need to call STS to assume a new role...
if options.role:
# call STS to get credentials for the account and role...
try:
arn = "arn:aws:iam::{}:role/{}".format(account_id, options.role)
logger.debug("Calling STS to get temporary credentials for: %s", arn)
assumed_role = sts_client.assume_role(
RoleArn=arn,
RoleSessionName="{}@{}".format(os.environ["USER"], socket.gethostname()),
ExternalId="{}@{}".format(os.environ["USER"], socket.gethostname())
)
except botocore.exceptions.BotoCoreError as be_bce:
print("error switching role ({}@{}): {}".format(options.role,
account_id, be_bce.args))
sys.exit(1)
except botocore.exceptions.ClientError as be_ce:
print("error switching role ({}@{}): {}".format(options.role,
account_id, be_ce.args))
sys.exit(1)
for region in options.regions:
logger.debug("Looking at region: %s", region)
cmd = {}
cmd["command"] = options.command
cmd["environment"] = {}
cmd["role"] = options.role
cmd["account_id"] = account_id
params['ThroughputMode'] = throughput_mode
if provisioned_throughput_in_mibps and provisioned_throughput_in_mibps != current_throughput:
params['ProvisionedThroughputInMibps'] = provisioned_throughput_in_mibps
if len(params) > 0:
wait_for(
lambda: self.get_file_system_state(name),
self.STATE_AVAILABLE,
self.wait_timeout
)
try:
self.connection.update_file_system(FileSystemId=fs_id, **params)
changed = True
except ClientError as e:
self.module.fail_json(msg="Unable to update file system: {0}".format(to_native(e)),
exception=traceback.format_exc(), **camel_dict_to_snake_dict(e.response))
except BotoCoreError as e:
self.module.fail_json(msg="Unable to update file system: {0}".format(to_native(e)),
exception=traceback.format_exc())
return changed
"Unknown service: '{service_name}'. Valid service names are: "
"{known_service_names}")
class ApiVersionNotFoundError(BotoCoreError):
"""
The data associated with either that API version or a compatible one
could not be loaded.
:ivar path: The data path that the user attempted to load.
:ivar path: The API version that the user attempted to load.
"""
fmt = 'Unable to load data {data_path} for: {api_version}'
class HTTPClientError(BotoCoreError):
fmt = 'An HTTP Client raised and unhandled exception: {error}'
def __init__(self, request=None, response=None, **kwargs):
self.request = request
self.response = response
super(HTTPClientError, self).__init__(**kwargs)
def __reduce__(self):
return _exception_from_packed_args, (
self.__class__, (self.request, self.response), self.kwargs)
class ConnectionError(BotoCoreError):
fmt = 'An HTTP Client failed to establish a connection: {error}'
class EndpointConnectionError(ConnectionError):
def describe_iam_roles(module, client):
name = module.params['name']
path_prefix = module.params['path_prefix']
if name:
try:
roles = [client.get_role(RoleName=name)['Role']]
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntity':
return []
else:
module.fail_json_aws(e, msg="Couldn't get IAM role %s" % name)
except botocore.exceptions.BotoCoreError as e:
module.fail_json_aws(e, msg="Couldn't get IAM role %s" % name)
else:
params = dict()
if path_prefix:
if not path_prefix.startswith('/'):
path_prefix = '/' + path_prefix
if not path_prefix.endswith('/'):
path_prefix = path_prefix + '/'
params['PathPrefix'] = path_prefix
try:
roles = list_iam_roles_with_backoff(client, **params)['Roles']
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e, msg="Couldn't list IAM roles")
return [camel_dict_to_snake_dict(describe_iam_role(module, client, role), ignore_list=['tags']) for role in roles]
self.attributes_set.append('display_name')
if not self.check_mode:
try:
self.connection.set_topic_attributes(TopicArn=self.topic_arn, AttributeName='DisplayName',
AttributeValue=self.display_name)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
self.module.fail_json_aws(e, msg="Couldn't set display name")
if self.policy and compare_policies(self.policy, json.loads(topic_attributes['Policy'])):
changed = True
self.attributes_set.append('policy')
if not self.check_mode:
try:
self.connection.set_topic_attributes(TopicArn=self.topic_arn, AttributeName='Policy',
AttributeValue=json.dumps(self.policy))
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
self.module.fail_json_aws(e, msg="Couldn't set topic policy")
if self.delivery_policy and ('DeliveryPolicy' not in topic_attributes or
compare_policies(self.delivery_policy, json.loads(topic_attributes['DeliveryPolicy']))):
changed = True
self.attributes_set.append('delivery_policy')
if not self.check_mode:
try:
self.connection.set_topic_attributes(TopicArn=self.topic_arn, AttributeName='DeliveryPolicy',
AttributeValue=json.dumps(self.delivery_policy))
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
self.module.fail_json_aws(e, msg="Couldn't set topic delivery policy")
return changed
super(WaiterError, self).__init__(name=name, reason=reason)
self.last_response = last_response
class IncompleteReadError(BotoCoreError):
"""HTTP response did not return expected number of bytes."""
fmt = ('{actual_bytes} read, but total bytes '
'expected is {expected_bytes}.')
class InvalidExpressionError(BotoCoreError):
"""Expression is either invalid or too complex."""
fmt = 'Invalid expression {expression}: Only dotted lookups are supported.'
class UnknownCredentialError(BotoCoreError):
"""Tried to insert before/after an unregistered credential type."""
fmt = 'Credential named {name} not found.'
class WaiterConfigError(BotoCoreError):
"""Error when processing waiter configuration."""
fmt = 'Error processing waiter config: {error_msg}'
class UnknownClientMethodError(BotoCoreError):
"""Error when trying to access a method on a client that does not exist."""
fmt = 'Client does not have method: {method_name}'
class UnsupportedSignatureVersionError(BotoCoreError):
"""Error when trying to access a method on a client that does not exist."""
"""
fmt = ('Need to rewind the stream {stream_object}, but stream '
'is not seekable.')
class WaiterError(BotoCoreError):
"""Waiter failed to reach desired state."""
fmt = 'Waiter {name} failed: {reason}'
def __init__(self, name, reason, last_response):
super(WaiterError, self).__init__(name=name, reason=reason)
self.last_response = last_response
class IncompleteReadError(BotoCoreError):
"""HTTP response did not return expected number of bytes."""
fmt = ('{actual_bytes} read, but total bytes '
'expected is {expected_bytes}.')
class InvalidExpressionError(BotoCoreError):
"""Expression is either invalid or too complex."""
fmt = 'Invalid expression {expression}: Only dotted lookups are supported.'
class UnknownCredentialError(BotoCoreError):
"""Tried to insert before/after an unregistered credential type."""
fmt = 'Credential named {name} not found.'
class WaiterConfigError(BotoCoreError):
def modify_replication_subnet_group(module, connection):
try:
modify_params = create_module_params(module)
return replication_subnet_group_modify(connection, **modify_params)
except botocore.exceptions.ClientError as e:
module.fail_json(msg="Failed to Modify the DMS replication subnet group.",
exception=traceback.format_exc(),
**camel_dict_to_snake_dict(e.response))
except botocore.exceptions.BotoCoreError as e:
module.fail_json(msg="Failed to Modify the DMS replication subnet group.",
exception=traceback.format_exc())
def get_aws_account_id(module):
try:
region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)
client = boto3_conn(module, conn_type='client', resource='sts',
region=region, endpoint=ec2_url, **aws_connect_kwargs)
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e, msg="Can't authorize connection")
try:
return client.get_caller_identity()['Account']
except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
module.fail_json_aws(e, msg="Couldn't obtain AWS account id")