Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parsed = parse.urlparse(url)
backend = parsed.scheme
query = parsed.query
# NOTE(flaper87): We need the following hack
# for python versions < 2.7.5. Previous versions
# of python parsed query params just for 'known'
# schemes. This was changed in this patch:
# http://hg.python.org/cpython/rev/79e6ff3d9afd
if not query and '?' in parsed.path:
query = parsed.path.split('?', 1)[-1]
parameters = parse.parse_qsl(query)
kwargs = {'options': dict(parameters)}
mgr = driver.DriverManager('zaqar.openstack.common.cache.backends', backend,
invoke_on_load=True,
invoke_args=[parsed],
invoke_kwds=kwargs)
return mgr.driver
def __init__(self, **kwargs):
super(BaseAmphoraTask, self).__init__(**kwargs)
self.amphora_driver = stevedore_driver.DriverManager(
namespace='octavia.amphora.drivers',
name=CONF.controller_worker.amphora_driver,
invoke_on_load=True
).driver
self.amphora_repo = repo.AmphoraRepository()
self.listener_repo = repo.ListenerRepository()
self.loadbalancer_repo = repo.LoadBalancerRepository()
self.task_utils = task_utilities.TaskUtils()
def make_provider(name: str, auth: dict, credentials: dict, settings: dict, **kwargs):
"""Returns an instance of :class:`waterbutler.core.provider.BaseProvider`
:param str name: The name of the provider to instantiate. (s3, box, etc)
:param dict auth:
:param dict credentials:
:param dict settings:
:param dict \*\*kwargs: currently there to absorb ``callback_url``
:rtype: :class:`waterbutler.core.provider.BaseProvider`
"""
try:
manager = driver.DriverManager(
namespace='waterbutler.providers',
name=name,
invoke_on_load=True,
invoke_args=(auth, credentials, settings),
invoke_kwds=kwargs,
)
except RuntimeError:
raise exceptions.ProviderNotFound(name)
return manager.driver
def _prepare_worker(self, executor='blocking'):
mgr = driver.DriverManager('kombu_driver.executors', executor)
executor_opts = {}
if executor != 'blocking':
executor_opts['max_workers'] = self._executor_threads
self._worker = mgr.driver(**executor_opts)
def __init__(self):
super(HaproxyManager, self).__init__()
self.amphoraconfig = {}
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.cert_manager = stevedore_driver.DriverManager(
namespace='octavia.cert_manager',
name=cfg.CONF.certificates.cert_manager,
invoke_on_load=True,
).driver
self.jinja = jinja_cfg.JinjaTemplater(
base_amp_path=self.amp_config.base_path,
base_crt_dir=self.amp_config.base_cert_dir,
haproxy_template=self.amp_config.haproxy_template)
@property
def api(self):
if not self._api and self._api_mod:
try:
namespace = 'zaqarclient.api'
mgr = driver.DriverManager(namespace,
self._api_mod,
invoke_on_load=True)
self._api = mgr.driver
except RuntimeError as ex:
raise errors.DriverLoadFailure(self._api_mod, ex)
return self._api
def get_backend():
global _IMPL
if not _IMPL:
_IMPL = driver.DriverManager("magnum.database.migration_backend",
CONF.database.backend).driver
return _IMPL
def get_network_driver():
CONF.import_group('controller_worker', 'octavia.common.config')
network_driver = stevedore_driver.DriverManager(
namespace='octavia.network.drivers',
name=CONF.controller_worker.network_driver,
invoke_on_load=True
).driver
return network_driver
def get_engine(conf):
"""Load the configured engine and return an instance."""
engine_name = urlparse.urlparse(conf.database.connection).scheme
LOG.debug('looking for %r driver in %r',
engine_name, STORAGE_ENGINE_NAMESPACE)
mgr = driver.DriverManager(STORAGE_ENGINE_NAMESPACE,
engine_name,
invoke_on_load=True)
return mgr.driver
def get_backend():
global _CERT_MANAGER_PLUGIN
if not _CERT_MANAGER_PLUGIN:
_CERT_MANAGER_PLUGIN = driver.DriverManager(
"magnum.cert_manager.backend",
CONF.certificates.cert_manager_type).driver
return _CERT_MANAGER_PLUGIN