Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _load_drivers(self):
manager = stevedore.enabled.EnabledExtensionManager(
namespace=self.MODULE_DRIVER_NAMESPACE,
check_func=self._check_extension,
invoke_on_load=True,
invoke_kwds={})
try:
manager.map(self.add_driver_extension)
except stevedore.exception.NoMatches:
LOG.info("No module drivers loaded")
def load_driver(driver_name, *args):
namespace = 'keystone.unified_limit.model'
try:
driver_manager = stevedore.DriverManager(namespace,
driver_name,
invoke_on_load=True,
invoke_args=args)
return driver_manager.driver
except stevedore.exception.NoMatches:
msg = (_('Unable to find %(name)r driver in %(namespace)r.'))
raise ImportError(msg % {'name': driver_name, 'namespace': namespace})
default_dispatcher = parsed_url.scheme
if default_dispatcher == 'direct':
LOG.warning('Direct publisher is deprecated for removal. Use '
'an explicit publisher instead, e.g. '
'"database", "file", ...')
default_dispatcher = 'database'
options = urlparse.parse_qs(parsed_url.query)
self.dispatcher_name = options.get('dispatcher',
[default_dispatcher])[-1]
self._sample_dispatcher = None
self._event_dispatcher = None
try:
self.sample_driver = driver.DriverManager(
'ceilometer.dispatcher.meter', self.dispatcher_name).driver
except stevedore.exception.NoMatches:
self.sample_driver = None
try:
self.event_driver = driver.DriverManager(
'ceilometer.dispatcher.event', self.dispatcher_name).driver
except stevedore.exception.NoMatches:
self.event_driver = None
def load_driver(namespace, driver_name, *args):
try:
driver_manager = stevedore.DriverManager(namespace,
driver_name,
invoke_on_load=True,
invoke_args=args)
return driver_manager.driver
except stevedore.exception.NoMatches:
msg = (_('Unable to find %(name)r driver in %(namespace)r.'))
raise ImportError(msg % {'name': driver_name, 'namespace': namespace})
def API(configuration=None):
conf = configuration or cfg.CONF
conf.register_opts(key_manager_opts, group='key_manager')
try:
mgr = driver.DriverManager("castellan.drivers",
conf.key_manager.backend,
invoke_on_load=True,
invoke_args=[conf])
return mgr.driver
except exception.NoMatches:
LOG.warning("Deprecation Warning : %s is not a stevedore based driver,"
" trying to load it as a class", conf.key_manager.backend)
cls = importutils.import_class(conf.key_manager.backend)
return cls(configuration=conf)
def device_manager(device_cfg):
device_type = device_cfg.get('device_type', '')
try:
mgr = stevedore.driver.DriverManager(
namespace=GENERIC_SWITCH_NAMESPACE,
name=device_type,
invoke_on_load=True,
invoke_args=(device_cfg,),
on_load_failure_callback=_load_failure_hook
)
except stevedore.exception.NoUniqueMatch as exc:
raise gsw_exc.GenericSwitchEntrypointLoadError(
ep='.'.join((GENERIC_SWITCH_NAMESPACE, device_type)),
err=exc)
return mgr.driver
options = urlparse.parse_qs(parsed_url.query)
self.dispatcher_name = options.get('dispatcher',
[default_dispatcher])[-1]
self._sample_dispatcher = None
self._event_dispatcher = None
try:
self.sample_driver = driver.DriverManager(
'ceilometer.dispatcher.meter', self.dispatcher_name).driver
except stevedore.exception.NoMatches:
self.sample_driver = None
try:
self.event_driver = driver.DriverManager(
'ceilometer.dispatcher.event', self.dispatcher_name).driver
except stevedore.exception.NoMatches:
self.event_driver = None