Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _fake_extension_manager(self, ext):
return extension.ExtensionManager.make_test_instance(
[extension.Extension('test', None, None, ext), ])
def get_insert_pre_hourly_cmpt_mgr():
return ExtensionManager.make_test_instance([Extension(
'prepare_data',
'monasca_transform.component.insert.prepare_data:PrepareData',
PrepareData(),
None),
Extension('insert_data',
'tests.functional.component.insert.dummy_insert:'
'DummyInsert',
DummyInsert(),
None),
Extension('insert_data_pre_hourly',
'tests.functional.component.insert.'
'dummy_insert_pre_hourly:'
'DummyInsertPreHourly',
DummyInsertPreHourly(),
None),
])
self.stubs.Set(db, 'instance_info_cache_delete', self.do_nothing)
self.stubs.Set(db, 'instance_destroy', self.do_nothing)
self.stubs.Set(db, 'instance_system_metadata_get',
self.fake_db_instance_system_metadata_get)
self.stubs.Set(db, 'block_device_mapping_get_all_by_instance',
lambda context, instance: {})
self.stubs.Set(db, 'instance_update_and_get_original',
lambda *args, **kwargs: (self.instance, self.instance))
self.stubs.Set(flavors, 'extract_flavor', self.fake_extract_flavor)
# Set up to capture the notification messages generated by the
# plugin and to invoke our notifier plugin.
self.notifications = []
ext_mgr = extension.ExtensionManager.make_test_instance(
extensions=[
extension.Extension('test',
None,
None,
self.Pollster(),
),
],
)
self.ext_mgr = ext_mgr
self.gatherer = nova_notifier.DeletedInstanceStatsGatherer(ext_mgr)
# Initialize the global _gatherer in nova_notifier to use the
# gatherer in this test instead of the gatherer in nova_notifier.
nova_notifier.initialize_gatherer(self.gatherer)
# Terminate the instance to trigger the notification.
with contextlib.nested(
def _make_test_manager(self, plugin):
return extension.ExtensionManager.make_test_instance(
[extension.Extension('test_plugin', None, test_plugin, None)])
def get_setter_cmpt_mgr():
return ExtensionManager.make_test_instance([Extension(
'set_aggregated_metric_name',
'monasca_transform.component.setter.'
'set_aggregated_metric_name:SetAggregatedMetricName',
SetAggregatedMetricName(),
None),
Extension('set_aggregated_period',
'monasca_transform.component.setter.'
'set_aggregated_period:SetAggregatedPeriod',
SetAggregatedPeriod(),
None),
Extension('rollup_quantity',
'monasca_transform.component.setter.'
'rollup_quantity:RollupQuantity',
RollupQuantity(),
None)
])
def get_usage_cmpt_mgr():
return ExtensionManager.make_test_instance([Extension(
'fetch_quantity',
'monasca_transform.component.usage.'
'fetch_quantity:'
'FetchQuantity',
FetchQuantity(),
None),
Extension(
'fetch_quantity_util',
'monasca_transform.component.usage.'
'fetch_quantity_util:'
'FetchQuantityUtil',
FetchQuantityUtil(),
None),
Extension(
'calculate_rate',
'monasca_transform.component.usage.'
def _empty_extension_manager(self):
return extension.ExtensionManager.make_test_instance([])