Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from keystoneauth1 import _utils as utils
from keystoneauth1 import access
from keystoneauth1 import exceptions
from keystoneauth1.identity import base
_logger = utils.get_logger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Auth(base.BaseIdentityPlugin):
"""Identity V2 Authentication Plugin.
:param string auth_url: Identity service endpoint for authorization.
:param string trust_id: Trust ID for trust scoping.
:param string tenant_id: Tenant ID for project scoping.
:param string tenant_name: Tenant name for project scoping.
:param bool reauthenticate: Allow fetching a new token if the current one
is going to expire. (optional) default True
"""
def __init__(self, auth_url,
trust_id=None,
def _initialize_loggers(self):
from keystoneauth1 import _utils
session_logger = _utils.get_logger('keystoneauth1.session')
session_logger.setLevel(settings.DEP_LOGGING_LEVEL)
auth1_logger = _utils.get_logger('keystoneauth1')
auth1_logger.setLevel(settings.DEP_LOGGING_LEVEL)
ksauth_logger = _utils.get_logger('keystoneauth')
ksauth_logger.setLevel(settings.DEP_LOGGING_LEVEL)
ks_identity_logger = _utils.get_logger('keystoneauth.identity.v3.base')
ks_identity_logger.setLevel(settings.DEP_LOGGING_LEVEL)
ostack_logger = _utils.get_logger('openstack')
ostack_logger.setLevel(settings.DEP_LOGGING_LEVEL)
ostack_session_logger = _utils.get_logger('openstack.session')
ostack_session_logger.setLevel(settings.DEP_LOGGING_LEVEL)
param.
- `allow_redirects` is ignored as redirects are handled
by the session.
:raises keystoneauth1.exceptions.base.ClientException: For connection
failure, or to indicate an error response code.
:returns: The response to the request.
"""
# If a logger is passed in, use it and do not log requests, responses
# and bodies separately.
if logger:
split_loggers = False
else:
split_loggers = None
logger = logger or utils.get_logger(__name__)
# NOTE(gmann): Convert r initlize the headers to
# CaseInsensitiveDict to make sure headers are
# case insensitive.
if kwargs.get('headers'):
kwargs['headers'] = requests.structures.CaseInsensitiveDict(
kwargs['headers'])
else:
kwargs['headers'] = requests.structures.CaseInsensitiveDict()
if connect_retries is None:
connect_retries = self._connect_retries
# HTTP 503 - Service Unavailable
retriable_status_codes = retriable_status_codes or [503]
rate_semaphore = rate_semaphore or self._rate_semaphore
headers = kwargs.setdefault('headers', dict())
if microversion:
from openstack import utils
from openstack.session import map_exceptions
from openstack.service_endpoint import endpoint as _endpoint
from keystoneauth1 import exceptions
from openstack.exceptions import MissingRequiredArgument
from openstack.identity import identity_service
EMPTYSTRING = ""
TERMINATORSTRING = "sdk_request"
ALGORITHM = "SDK-HMAC-SHA256"
PYTHON2 = "2"
UTF8 = "utf-8"
PROJECTIAMURL = "https://iam.%s.%s/v3/"
GLOBALIAMURL = "https://iam.%s/v3/"
_logger = log_utils.get_logger(__name__)
def construct_session(session_obj=None):
"""
# NOTE(morganfainberg): if the logic in this function changes be sure to
# update the betamax fixture's '_construct_session_with_betamax" function
# as well.
"""
if not session_obj:
session_obj = requests.Session()
# Use TCPKeepAliveAdapter to fix bug 1323862
for scheme in list(session_obj.adapters):
session_obj.mount(scheme, TCPKeepAliveAdapter())
return session_obj
def _initialize_loggers(self):
from keystoneauth1 import _utils
session_logger = _utils.get_logger('keystoneauth1.session')
session_logger.setLevel(settings.DEP_LOGGING_LEVEL)
auth1_logger = _utils.get_logger('keystoneauth1')
auth1_logger.setLevel(settings.DEP_LOGGING_LEVEL)
ksauth_logger = _utils.get_logger('keystoneauth')
ksauth_logger.setLevel(settings.DEP_LOGGING_LEVEL)
ks_identity_logger = _utils.get_logger('keystoneauth.identity.v3.base')
ks_identity_logger.setLevel(settings.DEP_LOGGING_LEVEL)
ostack_logger = _utils.get_logger('openstack')
ostack_logger.setLevel(settings.DEP_LOGGING_LEVEL)
ostack_session_logger = _utils.get_logger('openstack.session')
ostack_session_logger.setLevel(settings.DEP_LOGGING_LEVEL)
def _http_log_request(self, url, method=None, data=None,
json=None, headers=None, query_params=None,
logger=None, split_loggers=None):
string_parts = []
if self._get_split_loggers(split_loggers):
logger = utils.get_logger(__name__ + '.request')
else:
# Only a single logger was passed in, use string prefixing.
string_parts.append('REQ:')
if not logger:
logger = utils.get_logger(__name__)
if not logger.isEnabledFor(logging.DEBUG):
# NOTE(morganfainberg): This whole debug section is expensive,
# there is no need to do the work if we're not going to emit a
# debug log.
return
string_parts.append('curl -g -i')
# NOTE(jamielennox): None means let requests do its default validation
# so we need to actually check that this is False.