Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _refresh_trustee_client(self):
# Remove project_name and project_id, since we need a trust scoped
# auth object
kwargs = {
'project_name': None,
'project_domain_name': None,
'project_id': None,
'trust_id': self.trust_id
}
trustee_auth = ka_loading.load_auth_from_conf_options(
CONF, 'keystone_authtoken', **kwargs)
return self._load_session(trustee_auth)
def _get_auth_plugin(self):
"""Load an auth plugin from CONF options."""
# If an auth plugin name is defined in `auth_type` option of [ironic]
# group, register its options and load it.
auth_plugin = ks_loading.load_auth_from_conf_options(CONF,
IRONIC_GROUP.name)
return auth_plugin
def __init__(self, **kwargs):
super(GnocchiStorage, self).__init__(**kwargs)
self.auth = ks_loading.load_auth_from_conf_options(
CONF,
GNOCCHI_STORAGE_OPTS)
self.session = ks_loading.load_session_from_conf_options(
CONF,
GNOCCHI_STORAGE_OPTS,
auth=self.auth)
self._conn = gclient.Client(
'1',
session=self.session,
adapter_options={'connect_retries': 3,
'interface': CONF.storage_gnocchi.interface})
self._measures = {}
self._archive_policy_name = (
CONF.storage_gnocchi.archive_policy_name)
self._archive_policy_definition = json.loads(
CONF.storage_gnocchi.archive_policy_definition)
def _load_ks_session(conf):
"""Load session.
This is either an authenticated session or a requests session, depending on
what's configured.
"""
global _ADMIN_AUTH
global _SESSION
if not _ADMIN_AUTH:
_ADMIN_AUTH = ks_loading.load_auth_from_conf_options(
conf, nova.conf.vendordata.vendordata_group.name)
if not _ADMIN_AUTH:
LOG.warning(_LW('Passing insecure dynamic vendordata requests '
'because of missing or incorrect service account '
'configuration.'))
if not _SESSION:
_SESSION = ks_loading.load_session_from_conf_options(
conf, nova.conf.vendordata.vendordata_group.name,
auth=_ADMIN_AUTH)
return _SESSION
def get_session(group):
auth = loading.load_auth_from_conf_options(CONF, group)
session = loading.load_session_from_conf_options(
CONF, group, auth=auth)
return session
def get_clients(context):
global _SESSION
if not _SESSION:
_SESSION = loading.load_session_from_conf_options(
CONF, 'designate')
auth = token_endpoint.Token(CONF.designate.url, context.auth_token)
client = d_client.Client(session=_SESSION, auth=auth)
if CONF.designate.auth_type:
admin_auth = loading.load_auth_from_conf_options(
CONF, 'designate')
else:
admin_auth = password.Password(
auth_url=CONF.designate.admin_auth_url,
username=CONF.designate.admin_username,
password=CONF.designate.admin_password,
tenant_name=CONF.designate.admin_tenant_name,
tenant_id=CONF.designate.admin_tenant_id)
admin_client = d_client.Client(session=_SESSION, auth=admin_auth,
endpoint_override=CONF.designate.url)
return client, admin_client
def load_auth(conf, group):
try:
auth = kaloading.load_auth_from_conf_options(conf, group)
except kaexception.MissingRequiredOptions as exc:
LOG.debug('Failed to load credentials from group %(grp)s: %(err)s',
{'grp': group, 'err': exc})
auth = None
return auth
def _get_clients(self):
p_client = placement_client.PlacementAPIClient(cfg.CONF)
n_auth = ks_loading.load_auth_from_conf_options(cfg.CONF, 'nova')
n_session = ks_loading.load_session_from_conf_options(
cfg.CONF,
'nova',
auth=n_auth)
extensions = [
ext for ext in nova_client.discover_extensions(NOVA_API_VERSION)
if ext.name == "server_external_events"]
n_client = nova_client.Client(
NOVA_API_VERSION,
session=n_session,
region_name=cfg.CONF.nova.region_name,
endpoint_type=cfg.CONF.nova.endpoint_type,
extensions=extensions)
return p_client, n_client
# placement, while it's still part of the nova project.
# Note that this might become the first thing we try if/as we move to
# using service types for conf group names in general.
confgrp = service_type
if not confgrp or not hasattr(CONF, confgrp):
raise exception.ConfGroupForServiceTypeNotFound(stype=service_type)
# Ensure we have an auth.
# NOTE(efried): This could be None, and that could be okay - e.g. if the
# result is being used for get_endpoint() and the conf only contains
# endpoint_override.
if not ksa_auth:
if ksa_session and ksa_session.auth:
ksa_auth = ksa_session.auth
else:
ksa_auth = ks_loading.load_auth_from_conf_options(CONF, confgrp)
if not ksa_session:
ksa_session = ks_loading.load_session_from_conf_options(
CONF, confgrp, auth=ksa_auth)
return ks_loading.load_adapter_from_conf_options(
CONF, confgrp, session=ksa_session, auth=ksa_auth,
min_version=min_version, max_version=max_version, raise_exc=False)