Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_get_value('access_key_secret'),
))
elif type_ == 'ecs_ram_role':
return InstanceProfileCredentialsProvider(_get_value('role_name'))
elif type_ == 'ram_role_arn':
return RamRoleCredentialsProvider(
self.client_config,
AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
), _get_value('role_arn'), role_session_name=_get_value('role_session_name'))
elif type_ == 'bearer_token':
return StaticCredentialsProvider(BearerTokenCredentials(
_get_value('bearer_token'),
))
elif type_ == 'rsa_key_pair':
raise ClientException("RSA Key Pair credentials are not supported.")
elif type_ == 'sts_token':
return StaticCredentialsProvider(SecurityCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
_get_value('security_token'),
))
else:
raise Exception("Unexpected credentials type: " + type_)
def _get_provider_by_profile(self, profile):
def _get_value(key):
if key not in profile:
raise ClientException(
"{0} must be set for credentials type {1}.".format(key, type_))
type_ = profile.get('type')
if not type_:
type_ = 'access_key' # use access_key for default type
if type_ == 'access_key':
return StaticCredentialsProvider(AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
))
elif type_ == 'ecs_ram_role':
return InstanceProfileCredentialsProvider(_get_value('role_name'))
elif type_ == 'ram_role_arn':
return RamRoleCredentialsProvider(AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
), _get_value('role_arn'), role_session_name=_get_value('role_session_name'))
elif type_ == 'bearer_token':
return StaticCredentialsProvider(BearerTokenCredentials(
_get_value('bearer_token'),
return StaticCredentialsProvider(AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
))
elif type_ == 'ecs_ram_role':
return InstanceProfileCredentialsProvider(_get_value('role_name'))
elif type_ == 'ram_role_arn':
return RamRoleCredentialsProvider(AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
), _get_value('role_arn'), role_session_name=_get_value('role_session_name'))
elif type_ == 'bearer_token':
return StaticCredentialsProvider(BearerTokenCredentials(
_get_value('bearer_token'),
))
elif type_ == 'rsa_key_pair':
raise ClientException("RSA Key Pair credentials are not supported.")
elif type_ == 'sts_token':
return StaticCredentialsProvider(SecurityCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
_get_value('security_token'),
))
else:
raise Exception("Unexpected credentials type: " + type_)
def _get_provider_by_profile(self, profile):
def _get_value(key):
if key not in profile:
raise ClientException(
"{0} must be set for credentials type {1}.".format(key, type_))
return profile[key]
type_ = profile.get('type')
if not type_:
type_ = 'access_key' # use access_key for default type
if type_ == 'access_key':
return StaticCredentialsProvider(AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
))
elif type_ == 'ecs_ram_role':
return InstanceProfileCredentialsProvider(_get_value('role_name'))
elif type_ == 'ram_role_arn':
return RamRoleCredentialsProvider(
self.client_config,
AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
), _get_value('role_arn'), role_session_name=_get_value('role_session_name'))
elif type_ == 'bearer_token':
elif type_ == 'ram_role_arn':
return RamRoleCredentialsProvider(AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
), _get_value('role_arn'), role_session_name=_get_value('role_session_name'))
elif type_ == 'bearer_token':
return StaticCredentialsProvider(BearerTokenCredentials(
_get_value('bearer_token'),
))
elif type_ == 'rsa_key_pair':
raise ClientException("RSA Key Pair credentials are not supported.")
elif type_ == 'sts_token':
return StaticCredentialsProvider(SecurityCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
_get_value('security_token'),
))
else:
raise Exception("Unexpected credentials type: " + type_)
self.client_config,
AccessKeyCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
), _get_value('role_arn'), role_session_name=_get_value('role_session_name'))
elif type_ == 'bearer_token':
return StaticCredentialsProvider(BearerTokenCredentials(
_get_value('bearer_token'),
))
elif type_ == 'rsa_key_pair':
raise ClientException("RSA Key Pair credentials are not supported.")
elif type_ == 'sts_token':
return StaticCredentialsProvider(SecurityCredentials(
_get_value('access_key_id'),
_get_value('access_key_secret'),
_get_value('security_token'),
))
else:
raise Exception("Unexpected credentials type: " + type_)
def __init__(self, client_config, access_key_credentials, role_arn,
role_session_name='DefaultSessionName'):
self.client_config = client_config
self.access_key_credentials = access_key_credentials
self.role_arn = role_arn
self.role_session_name = role_session_name
self._fetcher = AssumeRoleCaller(client_config,
StaticCredentialsProvider(access_key_credentials))
RotatingCredentialsProvider.__init__(self, self.SESSION_PERIOD, self.REFRESH_FACTOR)