Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_keystone_session(username, password, project_name):
"""Return a keystone `Session` obect for the given user/project.
username, password and project_name are the keystone user/pass/project
of the desired session.
"""
auth = v3.Password(auth_url=_keystone_cfg_opt('auth_url'),
username=username,
password=password,
project_name=project_name,
user_domain_id='default',
project_domain_id='default')
return session.Session(auth=auth)
cls.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
password=CONF.keymanager.password,
tenant_name=CONF.keymanager.project_name)
else:
cls.auth = identity.Password(
auth_url=CONF.identity.uri,
username=CONF.keymanager.username,
user_domain_name=CONF.identity.domain_name,
password=CONF.keymanager.password,
project_name=CONF.keymanager.project_name,
project_domain_name=CONF.keymanager.project_domain_name)
# enables the tests in this class to share a keystone token
cls.sess = session.Session(auth=cls.auth)
def _get_keystone_stuff(self, ks_version, auth):
sess = session.Session(auth=auth)
if int(ks_version) == 2:
ks = keystone_v2_client.Client(session=sess)
else:
ks = keystone_v3_client.Client(session=sess)
return (sess, ks)
def _get_keystone_client(site):
if site.get('keystone_version', '').startswith('3'):
auth = ks_v3_identity.Password(
auth_url=site.get('keystone_auth_url', site['auth_url']),
username=site['username'],
password=site['password'],
user_domain_name=site.get('user_domain_name', 'default'),
project_name=site['tenant_name'],
project_domain_name=site.get('project_domain_name', 'default'),
)
session = ks_session.Session(auth=auth)
client = ks_v3_client.Client(session=session, version=(3,))
else:
client = ks_v2_client.Client(
username=site['username'],
password=site['password'],
tenant_name=site['tenant_name'],
auth_url=site['auth_url'],
)
return client
"tenant_name": os.environ.get("OS_PROJECT_NAME", opts.project_name),
}
user_domain_id = os.environ.get("OS_USER_DOMAIN_ID", opts.user_domain_id)
if user_domain_id:
auth_params["user_domain_id"] = user_domain_id
project_domain_id = os.environ.get("OS_PROJECT_DOMAIN_ID",
opts.project_domain_id)
if project_domain_id:
auth_params["project_domain_id"] = project_domain_id
auth = identity.Password(os.environ.get("OS_AUTH_URL", opts.auth_url),
**auth_params)
try:
sess = session.Session(auth=auth)
nc = client.Client(session=sess)
network_name = opts.network
network = nc.list_networks(name=network_name)['networks'][0]
print(network['provider:segmentation_id'])
create_body = {
'port':
{'network_id': network['id'],
'admin_state_up': True,
'name': 'generic_switch_test'
}
}
port_id = nc.create_port(create_body)['port']['id']
host = nc.list_agents(
def get_insecure_session(self):
session = self._sessions.get(SESSION_TYPE_INSECURE)
if not session:
session = keystone.Session(verify=False)
self._set_session(SESSION_TYPE_INSECURE, session)
return session
def get_neutron_client(self):
from keystoneauth1 import identity
from keystoneauth1 import session
from neutronclient.v2_0 import client
auth = identity.Password(
auth_url=self.os_kwargs["auth_url"],
username=self.os_kwargs["username"],
password=self.os_kwargs["password"],
user_domain_name=self.os_kwargs["domain_name"],
project_id=self.os_kwargs["project_id"],
)
sess = session.Session(auth=auth)
neutron = client.Client(session=sess,
region_name=self.os_kwargs["region_name"],
endpoint_type=self.os_kwargs["identity_interface"])
return neutron
if 'endpoint_type' in kwargs.keys():
self.client_kwargs['endpoint_type'] = kwargs.pop('endpoint_type')
if 'identity_api_version' in kwargs.keys():
kwargs.pop('identity_api_version')
if 'auth_version' in kwargs.keys():
kwargs.pop('auth_version')
if 'interface' in kwargs.keys():
self.client_kwargs['interface'] = kwargs.pop('interface')
self.compute_version = kwargs.pop('compute_api_version', 2)
self.image_version = kwargs.pop('image_api_version', 2)
self.volume_version = kwargs.pop('volume_api_version', 2)
self.neutron_version = kwargs.pop('neutron_api_version', 2)
self.auth = loader.load_from_options(auth_url=auth_url, **kwargs)
self.sess = session.Session(auth=self.auth, **session_kwargs)
def connect(self, cfg):
loader = loading.get_plugin_loader('password')
auth = loader.load_from_options(
auth_url=cfg['auth_url'],
username=cfg['username'],
password=cfg['password'],
tenant_name=cfg['tenant_name'])
sess = session.Session(auth=auth)
client_kwargs = dict(
session=sess,
)
if cfg.get('glance_url'):
client_kwargs['endpoint'] = cfg['glance_url']
return GlanceClientWithSugar(**client_kwargs)
def _get_keystone_session(self, **kwargs):
# first create a Keystone session
cacert = self.options.os_cacert or None
cert = self.options.os_cert or None
if cert and self.options.os_key:
cert = cert, self.options.os_key
insecure = self.options.insecure or False
if insecure:
verify = False
else:
verify = cacert or True
ks_session = session.Session(verify=verify, cert=cert)
# discover the supported keystone versions using the given url
(v2_auth_url, v3_auth_url) = self._discover_auth_versions(
session=ks_session,
auth_url=self.options.os_auth_url)
username = self.options.os_username or None
user_domain_name = self.options.os_user_domain_name or None
user_domain_id = self.options.os_user_domain_id or None
auth = None
if v3_auth_url and v2_auth_url:
# support both v2 and v3 auth. Use v3 if possible.
if username:
if user_domain_name or user_domain_id:
# use v3 auth
auth = self.get_v3_auth(v3_auth_url)