Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
expiration = data.get("Expiration")
if expiration:
self._expiration = time.mktime(time.strptime(expiration, '%Y-%m-%dT%H:%M:%SZ'))
else:
# FIXME Why?
self._expiration = expiration
return SecurityCredentials(
data['AccessKeyId'],
data['AccessKeySecret'],
data['SecurityToken'],
)
class ChainedCredentialsProvider(CredentialsProvider):
def __init__(self, provider_chain):
self._provider_chain = provider_chain
def provide(self):
for provider in self._provider_chain:
credentials = provider.provide()
if credentials:
return credentials
class PredefinedChainCredentialsProvider(ChainedCredentialsProvider):
def __init__(self, client_config, credentials_config_file_name, profile_name, role_name):
provider_chain = [
EnvCredentialsProvider(),
class CredentialsProvider(object):
def provide(self):
raise NotImplementedError
class StaticCredentialsProvider(CredentialsProvider):
def __init__(self, credentials):
self.credentials = credentials
def provide(self):
return self.credentials
class CachedCredentialsProvider(CredentialsProvider):
def __init__(self):
self._cached_credentials = None
def provide(self):
return self._cached_credentials
class RotatingCredentialsProvider(CachedCredentialsProvider):
def __init__(self, period, refresh_factor):
self._period = period
self._refresh_factor = refresh_factor
self._last_update_time = 0
self._expiration = 0
CachedCredentialsProvider.__init__(self)
# TODO remove aliyunsdkcore dependency
from aliyunsdkcore.request import CommonRequest
from aliyunsdkcore.request import RpcRequest
from aliyunsdkcore.auth.algorithm import sha_hmac256
from alibabacloud.utils.ini_helper import load_config
class CredentialsProvider(object):
def provide(self):
raise NotImplementedError
class StaticCredentialsProvider(CredentialsProvider):
def __init__(self, credentials):
self.credentials = credentials
def provide(self):
return self.credentials
class CachedCredentialsProvider(CredentialsProvider):
def __init__(self):
self._cached_credentials = None
def provide(self):
return self._cached_credentials
self._fetcher = AssumeRoleCaller(client_config,
StaticCredentialsProvider(access_key_credentials))
RotatingCredentialsProvider.__init__(self, self.SESSION_PERIOD, self.REFRESH_FACTOR)
def rotate_credentials(self):
context = self._fetcher.fetch(self.role_arn, self.role_session_name,
self.SESSION_PERIOD)
response = json.loads(context.http_response.text)
return SecurityCredentials(
response.get("Credentials").get("AccessKeyId"),
response.get("Credentials").get("AccessKeySecret"),
response.get("Credentials").get("SecurityToken"),
)
class ProfileCredentialsProvider(CredentialsProvider):
def __init__(self, client_config, credentials_config_file_name, profile_name):
self.environ = os.environ
profile = self._load_profile(credentials_config_file_name, profile_name)
self.client_config = client_config
self._inner_provider = self._get_provider_by_profile(profile)
@staticmethod
def _load_profile(config_file_name, profile_name):
full_path = os.path.expanduser(config_file_name)
if not os.path.isfile(full_path):
raise ClientException("Failed to find config file for SDK: " + full_path)
config = load_config(full_path)
profile = config.get(profile_name, {})
if 'type' not in profile:
raise ClientException('The Credentials file (%s) can not find the needed param "type".'
self.set_protocol_type('https')
def get_duration_seconds(self):
return self.get_query_params().get("DurationSeconds")
def set_duration_seconds(self, duration_seconds):
self.add_query_param('DurationSeconds', duration_seconds)
def get_public_key_id(self):
return self.get_query_params().get('PublicKeyId')
def set_public_key_id(self, public_key_id):
self.add_query_param('PublicKeyId', public_key_id)
class ProfileCredentialsProvider(CredentialsProvider):
def __init__(self, config_file_name, profile_name):
self.environ = os.environ
profile = self._load_profile(config_file_name, profile_name)
self._inner_provider = self._get_provider_by_profile(profile)
def _load_profile(self, config_file_name, profile_name):
full_path = os.path.expanduser(config_file_name)
if not os.path.isfile(full_path):
raise ClientException("Failed to find config file for SDK: " + full_path)
config = load_config(full_path)
profile = config.get(profile_name, {})
if 'type' not in profile:
raise ClientException(
'Credentials',
'The Credentials file (%s) can not find the needed param "type".' % full_path
from alibabacloud.exceptions import ClientException
from alibabacloud.credentials import AccessKeyCredentials
from alibabacloud.credentials import BearerTokenCredentials
from alibabacloud.credentials import SecurityCredentials
from alibabacloud.utils.ini_helper import load_config
from alibabacloud.credentials.assume_role_caller import AssumeRoleCaller
class CredentialsProvider(object):
def provide(self):
raise NotImplementedError
class StaticCredentialsProvider(CredentialsProvider):
def __init__(self, credentials):
self.credentials = credentials
def provide(self):
return self.credentials
class CachedCredentialsProvider(CredentialsProvider):
def __init__(self):
self._cached_credentials = None
def provide(self):
return self._cached_credentials
class CredentialsProvider(object):
def provide(self):
raise NotImplementedError
class StaticCredentialsProvider(CredentialsProvider):
def __init__(self, credentials):
self.credentials = credentials
def provide(self):
return self.credentials
class CachedCredentialsProvider(CredentialsProvider):
def __init__(self):
self._cached_credentials = None
def provide(self):
return self._cached_credentials
class RotatingCredentialsProvider(CachedCredentialsProvider):
def __init__(self, period, refresh_factor):
self._period = period
self._refresh_factor = refresh_factor
self._last_update_time = 0
self._expiration = 0
CachedCredentialsProvider.__init__(self)
expiration = data.get("Expiration")
if expiration:
self._expiration = time.mktime(time.strptime(expiration, '%Y-%m-%dT%H:%M:%SZ'))
else:
# FIXME Why?
self._expiration = expiration
return SecurityCredentials(
data['AccessKeyId'],
data['AccessKeySecret'],
data['SecurityToken'],
)
class ChainedCredentialsProvider(CredentialsProvider):
def __init__(self, provider_chain):
self._provider_chain = provider_chain
def provide(self):
for provider in self._provider_chain:
credentials = provider.provide()
if credentials:
return credentials
class PredefinedChainCredentialsProvider(ChainedCredentialsProvider):
def __init__(self, config_file_name, profile_name, role_name):
provider_chain = [
EnvCredentialsProvider(),