Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
- my_event
rpc2:
accepted_priorities:
- info
bar:
accepted_events:
- nothing
"""
config_file = mock.MagicMock()
config_file.return_value = routing_config
rpc_driver = mock.Mock()
rpc2_driver = mock.Mock()
bar_driver = mock.Mock()
pm = dispatch.DispatchExtensionManager.make_test_instance(
[extension.Extension('rpc', None, None, rpc_driver),
extension.Extension('rpc2', None, None, rpc2_driver),
extension.Extension('bar', None, None, bar_driver)],
)
with mock.patch.object(self.router, '_get_notifier_config_file',
config_file):
with mock.patch('stevedore.dispatch.DispatchExtensionManager',
return_value=pm):
self.notifier.info({}, 'my_event', {})
self.assertFalse(bar_driver.info.called)
rpc_driver.notify.assert_called_once_with(
{}, mock.ANY, 'INFO', None)
rpc2_driver.notify.assert_called_once_with(
{}, mock.ANY, 'INFO', None)
def __init__(self, conf):
super(GnocchiDispatcher, self).__init__(conf)
self.conf = conf
self.filter_service_activity = (
conf.dispatcher_gnocchi.filter_service_activity)
self._ks_client = utils.get_keystone_client()
self.gnocchi_url = conf.dispatcher_gnocchi.url
self.gnocchi_archive_policy_default = (
conf.dispatcher_gnocchi.archive_policy)
self.gnocchi_archive_policy_data = self._load_archive_policy(conf)
self.mgmr = stevedore.dispatch.DispatchExtensionManager(
'gnocchi.ceilometer.resource', lambda x: True,
invoke_on_load=True)
self._gnocchi_project_id = None
self._gnocchi_project_id_lock = threading.Lock()
self._gnocchi_api = None
self._gnocchi_api_lock = threading.Lock()
filename = CONF.oslo_messaging_notifications.routing_config
if not filename:
return
# Infer which drivers are used from the config file.
self.routing_groups = yaml.safe_load(
self._get_notifier_config_file(filename))
if not self.routing_groups:
self.routing_groups = {} # In case we got None from load()
return
for group in self.routing_groups.values():
self.used_drivers.update(group.keys())
LOG.debug('loading notifiers from %s', self.NOTIFIER_PLUGIN_NAMESPACE)
self.plugin_manager = dispatch.DispatchExtensionManager(
namespace=self.NOTIFIER_PLUGIN_NAMESPACE,
check_func=self._should_load_plugin,
invoke_on_load=True,
invoke_args=None)
if not list(self.plugin_manager):
LOG.warning("Failed to load any notifiers for %s",
self.NOTIFIER_PLUGIN_NAMESPACE)
otherwise they are logged and ignored.
.. versionadded:: 0.12
:param filter_func: Callable to test each extension.
:param method_name: The extension method name to call
for each extension.
:param args: Variable arguments to pass to method
:param kwds: Keyword arguments to pass to method
:returns: List of values returned from methods
"""
return self.map(filter_func, self._call_extension_method,
method_name, *args, **kwds)
class NameDispatchExtensionManager(DispatchExtensionManager):
"""Loads all plugins and filters on execution.
This is useful for long-running processes that need to pass
different inputs to different extensions and can predict the name
of the extensions before calling them.
The check_func argument should return a boolean, with ``True``
indicating that the extension should be loaded and made available
and ``False`` indicating that the extension should be ignored.
:param namespace: The namespace for the entry points.
:type namespace: str
:param check_func: Function to determine which extensions to load.
:type check_func: callable
:param invoke_on_load: Boolean controlling whether to invoke the
object returned by the entry point after the driver is loaded.