Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
LOG.debug('Running with parameter insecure = %s',
options.insecure)
if os.path.isfile(nova_cfg):
config = SafeConfigParser()
config.read(nova_cfg)
else:
LOG.error('Nova configuration file %s does not exist', nova_cfg)
sys.exit(1)
my_host = config.get('DEFAULT', 'host')
if not my_host:
# If host isn't set nova defaults to this
my_host = socket.gethostname()
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(
auth_url=config.get('neutron',
'auth_url'),
username=config.get('neutron',
'username'),
password=config.get('neutron',
'password'),
project_name=config.get('neutron',
'project_name'),
project_domain_name=config.get('neutron',
'project_domain_name'),
user_domain_name=config.get('neutron',
'user_domain_name'))
sess = session.Session(auth=auth, verify=options.insecure)
nova = client.Client('2.11', session=sess, endpoint_type='internal')
def __get_session(self):
if not self.__session:
self.__checkenv()
loader = keystoneLoading.get_plugin_loader('password')
auth = loader.load_from_options(username=os.environ.get('OS_USERNAME'),
password=os.environ.get('OS_PASSWORD'),
project_id=os.environ.get('OS_TENANT_ID'),
auth_url=os.environ.get('OS_AUTH_URL'))
self.__session = keystoneSession.Session(auth=auth)
return self.__session
def get_keystone_session():
auth_details = {}
auth_details['auth_url'] = CONF.keystone.auth_url
auth_details['username'] = CONF.keystone.username
auth_details['password'] = CONF.keystone.password
auth_details['project_name'] = CONF.keystone.project_name
auth_details['user_domain_name'] = CONF.keystone.user_domain_name
auth_details['project_domain_name'] = CONF.keystone.project_domain_name
loader = kaloading.get_plugin_loader('password')
auth_plugin = loader.load_from_options(**auth_details)
session = kaloading.session.Session().load_from_options(
auth=auth_plugin)
return session
def get_keystone_session():
"""Get trove service credential auth session."""
global _SESSION
if not _SESSION:
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(
username=CONF.service_credentials.username,
password=CONF.service_credentials.password,
project_name=CONF.service_credentials.project_name,
user_domain_name=CONF.service_credentials.user_domain_name,
project_domain_name=CONF.service_credentials.project_domain_name,
auth_url=CONF.service_credentials.auth_url)
_SESSION = session.Session(auth=auth)
return _SESSION
def create_designate_client(api_version='2'):
"""Creates a Designate DNSaaS client."""
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(auth_url=DNS_AUTH_URL,
username=DNS_USERNAME,
password=DNS_PASSKEY,
project_id=DNS_TENANT_ID,
user_domain_id=DNS_USER_DOMAIN_ID,
project_domain_id=DNS_PROJECT_DOMAIN_ID)
sesh = session.Session(auth=auth)
return client.Client(api_version, session=sesh)
def _get_auth_loader(self, config):
# Use the 'none' plugin for variants of None specified,
# since it does not look up endpoints or tokens but rather
# does a passthrough. This is useful for things like Ironic
# that have a keystoneless operational mode, but means we're
# still dealing with a keystoneauth Session object, so all the
# _other_ things (SSL arg handling, timeout) all work consistently
if config['auth_type'] in (None, "None", ''):
config['auth_type'] = 'none'
elif config['auth_type'] == 'token_endpoint':
# Humans have been trained to use a thing called token_endpoint
# That it does not exist in keystoneauth is irrelvant- it not
# doing what they want causes them sorrow.
config['auth_type'] = 'admin_token'
return loading.get_plugin_loader(config['auth_type'])
def get_session():
username = os.environ.get('OS_USERNAME')
password = os.environ.get('OS_PASSWORD')
project_name = os.environ.get('OS_PROJECT_NAME')
auth_url = os.environ.get('OS_AUTH_URL')
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(auth_url=auth_url,
username=username,
password=password,
project_name=project_name,
user_domain_id='default',
project_domain_id='default')
return session.Session(auth=auth)
def get_session():
username = os.environ.get('OS_USERNAME')
password = os.environ.get('OS_PASSWORD')
tenant_name = os.environ.get('OS_TENANT_NAME')
auth_url = os.environ.get('OS_AUTH_URL')
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(auth_url=auth_url,
username=username,
password=password,
project_name=tenant_name,
user_domain_id='default',
project_domain_id='default')
return session.Session(auth=auth)
# connection by mistake.
# Update: We are adding handling for casting 'True' or 'true' as a
# bool because if this value is set via an instrinsic function
# it will always be a string.
cfg_insecure = cfg[AUTH_PARAM_INSECURE]
if isinstance(cfg_insecure, basestring) and \
cfg_insecure.capitalize() == 'True':
cfg[AUTH_PARAM_INSECURE] = True
verify = not (cfg[AUTH_PARAM_INSECURE] is True)
del cfg[AUTH_PARAM_INSECURE]
elif AUTH_PARM_CA_CERT in cfg:
cfg = cfg.copy()
verify = cfg[AUTH_PARM_CA_CERT]
del cfg[AUTH_PARM_CA_CERT]
loader = loading.get_plugin_loader("password")
auth = loader.load_from_options(**cfg)
sess = session.Session(auth=auth, verify=verify)
return sess