Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _load_driver(namespace, name):
extension_manager = stevedore.DriverManager(
namespace=namespace,
name=name,
invoke_on_load=True
)
LOG.info("Driver {name} successfully loaded".format(name=name))
return extension_manager.driver
app.script_manager.add_command('createadminuser',
auth_forms.CreateAdminUser)
# Flask-Login, for Admin users.
app.admin_login_manager = flask_login.LoginManager(app)
# Set up callback to load user objects`
app.admin_login_manager.user_loader(
lambda user_id: db.session.query(models.AdminUser).get(user_id))
# Use stevedore to load drivers/extensions.
# Discover all drivers/extensions, but do not load any.
# Used for logging found extensions.
app.ext_mgr_all = stevedore.ExtensionManager(namespace='pg_discuss.ext')
# Load configured IdentityPolicy driver
app.identity_policy_loader = stevedore.DriverManager(
namespace='pg_discuss.ext',
name=app.config['DRIVER_IDENTITY_POLICY'],
)
# Initialize IdentityPolicyManager with configured IdentityManager
app.identity_policy_mgr = identity.IdentityPolicyManager(
app,
app.identity_policy_loader.driver,
)
# Load configured CommentRenderer driver
app.comment_renderer_loader = stevedore.DriverManager(
namespace='pg_discuss.ext',
name=app.config['DRIVER_COMMENT_RENDERER'],
)
app.comment_renderer = app.comment_renderer_loader.driver(app)
def get_handler(self, name: str) -> Callable[..., EventHandler]:
"""Get an handler class by name.
Available handler classes are registered using the e3.event.handler
entry_points in your setup.py
:param name: handler name
:return: an handler class
"""
return stevedore.DriverManager("e3.event.handler", name).driver
def load_store(name: str, configuration: Any, cache: Cache) -> Store:
plugin = stevedore.DriverManager("e3.store", name)
return plugin.driver(configuration, cache)
def load_driver(driver_name, *args):
namespace = 'keystone.unified_limit.model'
try:
driver_manager = stevedore.DriverManager(namespace,
driver_name,
invoke_on_load=True,
invoke_args=args)
return driver_manager.driver
except stevedore.exception.NoMatches:
msg = (_('Unable to find %(name)r driver in %(namespace)r.'))
raise ImportError(msg % {'name': driver_name, 'namespace': namespace})
def load_cache(name: str = "file-cache", configuration: Any = None) -> Cache:
plugin = stevedore.DriverManager(namespace="e3.store.cache.backend", name=name)
return plugin.driver(configuration)
def node_not_found_hook_manager(*args):
global _NOT_FOUND_HOOK_MGR
if _NOT_FOUND_HOOK_MGR is None:
name = CONF.processing.node_not_found_hook
if name:
_NOT_FOUND_HOOK_MGR = stevedore.DriverManager(
'ironic_inspector.hooks.node_not_found',
name=name)
return _NOT_FOUND_HOOK_MGR