Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_extension_manager(self):
return extension_tests.TestExtensionManager(
[
extension.Extension(
'test',
None,
None,
self.Pollster(), ),
extension.Extension(
'testanother',
None,
None,
self.PollsterAnother(), ),
extension.Extension(
'testexception',
None,
None,
self.PollsterException(), ),
def setUp(self):
super(TestPartitionedAlarmService, self).setUp()
self.threshold_eval = mock.Mock()
self.api_client = mock.MagicMock()
self.CONF = self.useFixture(config.Config()).conf
self.CONF.set_override('host',
'fake_host')
self.CONF.set_override('partition_rpc_topic',
'fake_topic',
group='alarm')
self.partitioned = service.PartitionedAlarmService()
self.partitioned.tg = mock.Mock()
self.partitioned.partition_coordinator = mock.Mock()
self.extension_mgr = extension_tests.TestExtensionManager(
[
extension.Extension(
'threshold',
None,
None,
self.threshold_eval, ),
])
self.partitioned.extension_manager = self.extension_mgr
def test_message_to_event_duplicate(self):
self.CONF.set_override("store_events", True, group="collector")
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.return_value = [
(models.Event.DUPLICATE, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.srv._message_to_event(message) # Should return silently.
def test_udp_receive(self):
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
with patch('socket.socket', self._make_fake_socket):
self.srv.start()
mock_dispatcher.record_metering_data.assert_called_once_with(
def setUp(self):
super(TestSingletonAlarmService, self).setUp()
self.threshold_eval = mock.Mock()
self.evaluators = extension_tests.TestExtensionManager(
[
extension.Extension(
'threshold',
None,
None,
self.threshold_eval),
])
self.api_client = mock.MagicMock()
self.singleton = service.SingletonAlarmService()
self.singleton.tg = mock.Mock()
self.singleton.evaluators = self.evaluators
self.singleton.supported_evaluators = ['threshold']
def test_message_to_event_bad_event(self):
self.CONF.set_override("store_events", True, group="collector")
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.return_value = [
(models.Event.UNKNOWN_PROBLEM, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.assertRaises(service.UnableToSaveEventException,
self.srv._message_to_event, message)
def test_udp_receive_storage_error(self):
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_metering_data.side_effect = self._raise_error
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
with patch('socket.socket', self._make_fake_socket):
self.srv.start()
def test_message_to_event_missing_keys(self):
now = timeutils.utcnow()
timeutils.set_time_override(now)
message = {'event_type': "foo",
'message_id': "abc",
'publisher_id': "1"}
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
with patch('ceilometer.collector.service.LOG') as mylog:
self.srv._message_to_event(message)
self.assertFalse(mylog.exception.called)
events = mock_dispatcher.record_events.call_args[0]
self.assertEqual(1, len(events))
event = events[0]
self.assertEqual("foo", event.event_name)
self.assertEqual(now, event.generated)
self.assertEqual(1, len(event.traits))
def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the service
# configuration.
self.CONF.set_override("store_events", False, group="collector")
with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start()
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
self.srv.notification_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
notifications.Instance(),
),
])
self.srv.process_notification(TEST_NOTICE)
self.assertTrue(
self.srv.pipeline_manager.publisher.called)