Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.rebuild_schema_manager = \
stevedore.enabled.EnabledExtensionManager(
namespace=self.EXTENSION_REBUILD_NAMESPACE,
check_func=_check_load_extension('get_server_rebuild_schema'),
invoke_on_load=True,
invoke_kwds={"extension_info": self.extension_info},
propagate_map_exceptions=True)
if list(self.rebuild_schema_manager):
self.rebuild_schema_manager.map(self._rebuild_extension_schema,
self.schema_server_rebuild)
else:
LOG.debug("Did not find any server rebuild schemas")
# Look for API schema of server resize extension
self.resize_schema_manager = \
stevedore.enabled.EnabledExtensionManager(
namespace=self.EXTENSION_RESIZE_NAMESPACE,
check_func=_check_load_extension('get_server_resize_schema'),
invoke_on_load=True,
invoke_kwds={"extension_info": self.extension_info},
propagate_map_exceptions=True)
if list(self.resize_schema_manager):
self.resize_schema_manager.map(self._resize_extension_schema,
self.schema_server_resize)
else:
LOG.debug("Did not find any server resize schemas")
def __init__(self, init_only=None):
def _check_load_extension(ext):
return self._register_extension(ext)
self.api_extension_manager = stevedore.enabled.EnabledExtensionManager(
namespace=self.api_extension_namespace(),
check_func=_check_load_extension,
invoke_on_load=True,
invoke_kwds={"extension_info": self.loaded_extension_info})
mapper = ProjectMapper()
self.resources = {}
if list(self.api_extension_manager):
self._register_resources_check_inherits(mapper)
self.api_extension_manager.map(self._register_controllers)
LOG.info("Loaded extensions: %s",
sorted(self.loaded_extension_info.get_extensions().keys()))
super(APIRouterV1, self).__init__(mapper)
def _get_plugins(self):
"""Return dict of resource-plugin class pairs."""
config_plugins = CONF.manager.plugins
plugins = {}
extension_manager = enabled.EnabledExtensionManager(
check_func=lambda ext: ext.name in config_plugins,
namespace='blazar.resource.plugins',
invoke_on_load=False
)
invalid_plugins = (set(config_plugins) -
set([ext.name for ext
in extension_manager.extensions]))
if invalid_plugins:
raise common_ex.BlazarException('Invalid plugin names are '
'specified: %s' % invalid_plugins)
for ext in extension_manager.extensions:
try:
plugin_obj = ext.plugin()
except Exception as e:
def get_extensions(cls, enabled_extensions=None):
"""Load a series of extensions"""
LOG.debug('Looking for extensions in %s' % cls.__plugin_ns__)
def _check_func(ext):
if enabled_extensions is None:
# All extensions are enabled by default, if no specific list
# is specified
return True
return ext.plugin.get_plugin_name() in enabled_extensions
mgr = enabled.EnabledExtensionManager(
cls.__plugin_ns__, check_func=_check_func,
propagate_map_exceptions=True)
return [e.plugin for e in mgr]
'%s extension %r disabled through configuration setting',
namespace, ext.name,
)
return False
if not ext.obj.is_enabled():
LOG.debug(
'%s extension %r reported that it is disabled',
namespace,
ext.name,
)
return False
LOG.debug('using %s extension %r', namespace, ext.name)
return True
class ActivatedExtensionManager(enabled.EnabledExtensionManager):
"""Loads extensions based on a configurable set that should be
disabled and asking each one if it should be active or not.
"""
def __init__(self, namespace, disabled_names, invoke_on_load=True,
invoke_args=(), invoke_kwds={}):
def local_check_func(ext):
return should_use_extension(namespace, ext, disabled_names)
super(ActivatedExtensionManager, self).__init__(
namespace=namespace,
check_func=local_check_func,
invoke_on_load=invoke_on_load,
invoke_args=invoke_args,
invoke_kwds=invoke_kwds,
def _load_cluster_plugins(self):
config_plugins = CONF.plugins
extension_manager = enabled.EnabledExtensionManager(
check_func=lambda ext: ext.name in config_plugins,
namespace='sahara.cluster.plugins',
invoke_on_load=True
)
for ext in extension_manager.extensions:
if ext.name in self.plugins:
raise ex.ConfigurationError(
_("Plugin with name '%s' already exists.") % ext.name)
ext.obj.name = ext.name
self.plugins[ext.name] = ext.obj
LOG.info("Plugin {plugin_name} loaded {entry_point}".format(
plugin_name=ext.name,
entry_point=ext.entry_point_target))
if len(self.plugins) < len(config_plugins):
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from stevedore import enabled
class EnabledExtensionManager(enabled.EnabledExtensionManager):
"""CloudKitty Rating processor manager
Override default EnabledExtensionManager to check for an internal
object property in the extension.
"""
def __init__(self, namespace, invoke_args=(), invoke_kwds={}):
def check_enabled(ext):
"""Check if extension is enabled.
"""
return ext.obj.enabled
super(EnabledExtensionManager, self).__init__(
namespace=namespace,
def __init__(self, migration_config, engine=None):
if engine is None:
if migration_config.get('db_url'):
engine = sqlalchemy.create_engine(
migration_config['db_url'],
poolclass=sqlalchemy.pool.NullPool,
)
else:
raise ValueError('Either database url or engine'
' must be provided.')
self._manager = enabled.EnabledExtensionManager(
MIGRATION_NAMESPACE,
check_plugin_enabled,
invoke_args=(engine, migration_config),
invoke_on_load=True
)
if not self._plugins:
raise ValueError('There must be at least one plugin active.')
def _load_drivers(self):
manager = stevedore.enabled.EnabledExtensionManager(
namespace=self.MODULE_DRIVER_NAMESPACE,
check_func=self._check_extension,
invoke_on_load=True,
invoke_kwds={})
try:
manager.map(self.add_driver_extension)
except stevedore.exception.NoMatches:
LOG.info("No module drivers loaded")
def initialise():
global _mgr
if _mgr:
return
def client_is_available(client_plugin):
if not hasattr(client_plugin.plugin, 'is_available'):
# if the client does not have a is_available() class method, then
# we assume it wants to be always available
return True
# let the client plugin decide if it wants to register or not
return client_plugin.plugin.is_available()
_mgr = enabled.EnabledExtensionManager(
namespace='heat.clients',
check_func=client_is_available,
invoke_on_load=False,
on_load_failure_callback=pluginutils.log_fail_msg)