Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
c = sample.Sample(
'volume.size',
'gauge',
'GiB',
5,
'user-id',
'project1',
'resource-id',
timestamp=datetime.datetime(2012, 9, 25, 10, 30),
resource_metadata={'display_name': 'test-volume',
'tag': 'self.counter',
},
source='test',
)
self.counters.append(c)
msg = rpc.meter_message_from_counter(
c,
secret='not-so-secret')
self.conn.record_metering_data(self.conn, msg)
# Create the old format alarm with a dict instead of a
# array for matching_metadata
alarm = dict(alarm_id='0ld-4l3rt',
enabled=True,
name='old-alert',
description='old-alert',
timestamp=None,
meter_name='cpu',
user_id='me',
project_id='and-da-boys',
comparison_operator='lt',
threshold=36,
for i in range(3):
s = sample.Sample(
'volume.size',
'gauge',
'GiB',
5 + i,
'user-id',
'project1',
'resource-id',
timestamp=datetime.datetime(2012, 9, 25, 10 + i, 30 + i),
resource_metadata={'display_name': 'test-volume',
'tag': 'self.sample',
},
source='source1',
)
msg = rpc.meter_message_from_counter(
s,
self.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(msg)
sample1 = sample.Sample(
'instance',
'cumulative',
'instance',
1,
'user-id',
'project-id',
'resource-id',
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
resource_metadata={'display_name': 'test-server',
'tag': 'self.sample',
},
source='test_list_users',
)
msg = rpc.meter_message_from_counter(
sample1,
self.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(msg)
sample2 = sample.Sample(
'instance',
'cumulative',
'',
1,
'user-id2',
'project-id',
'resource-id-alternate',
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
resource_metadata={'display_name': 'test-server',
'tag': 'self.sample2',
for test_sample in test_sample_data:
c = sample.Sample(
'instance',
sample.TYPE_CUMULATIVE,
unit='s',
volume=test_sample['volume'],
user_id=test_sample['user'],
project_id=test_sample['project'],
resource_id=test_sample['resource'],
timestamp=datetime.datetime(*test_sample['timestamp']),
resource_metadata={'flavor': test_sample['metadata_flavor'],
'event': test_sample['metadata_event'], },
source=test_sample['source'],
)
msg = rpc.meter_message_from_counter(
c,
self.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(msg)
def test_with_user(self):
counter1 = sample.Sample(
'instance',
'cumulative',
'',
1,
'user-id',
'project-id',
'resource-id',
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter',
},
source='test_list_resources',
)
msg = rpc.meter_message_from_counter(
counter1,
cfg.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(msg)
counter2 = sample.Sample(
'instance',
'cumulative',
'',
1,
'user-id2',
'project-id',
'resource-id-alternate',
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter2',
def setUp(self):
super(TestListProjects, self).setUp()
sample1 = sample.Sample(
'instance',
'cumulative',
'instance',
1,
'user-id',
'project-id',
'resource-id',
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
resource_metadata={'display_name': 'test-server',
'tag': 'self.sample'},
source='test_list_projects',
)
msg = rpc.meter_message_from_counter(
sample1,
self.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(msg)
sample2 = sample.Sample(
'instance',
'cumulative',
'instance',
1,
'user-id2',
'project-id2',
'resource-id-alternate',
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
resource_metadata={'display_name': 'test-server',
'tag': 'self.sample2'},
def test_with_invalid_resource_id(self):
counter1 = sample.Sample(
'instance',
'cumulative',
'',
1,
'user-id',
'project-id',
'resource-id-1',
timestamp=datetime.datetime(2012, 7, 2, 10, 40),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter',
},
source='test_list_resources',
)
msg = rpc.meter_message_from_counter(
counter1,
cfg.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(msg)
counter2 = sample.Sample(
'instance',
'cumulative',
'',
1,
'user-id2',
'project-id',
'resource-id-2',
timestamp=datetime.datetime(2012, 7, 2, 10, 41),
resource_metadata={'display_name': 'test-server',
'tag': 'self.counter2',
def test_meter_message_from_counter_field(self):
def compare(f, c, msg_f, msg):
self.assertEqual(msg, c)
msg = rpc.meter_message_from_counter(self.TEST_COUNTER,
'not-so-secret')
name_map = {'name': 'counter_name',
'type': 'counter_type',
'unit': 'counter_unit',
'volume': 'counter_volume'}
for f in self.TEST_COUNTER._fields:
msg_f = name_map.get(f, f)
yield compare, f, getattr(self.TEST_COUNTER, f), msg_f, msg[msg_f]
),
sample.Sample(
'instance',
'cumulative',
'',
1,
'user-id2',
'project2',
'resource-id-alternate',
timestamp=datetime.datetime(2012, 7, 2, 10, 42),
resource_metadata={'display_name': 'test-server',
'tag': 'self.sample2'},
source='source1',
),
]:
msg = rpc.meter_message_from_counter(
cnt,
self.CONF.publisher_rpc.metering_secret)
self.conn.record_metering_data(msg)
for i in range(3):
s = sample.Sample(
'volume.size',
'gauge',
'GiB',
5 + i,
'user-id',
'project1',
'resource-id-%s' % i,
timestamp=datetime.datetime(2012, 9, 25, 10 + i, 30 + i),
resource_metadata={'display_name': 'test-volume',
'tag': 'self.sample',
},
source='source1',
)
msg = rpc.meter_message_from_counter(
s,
self.CONF.publisher_rpc.metering_secret,
)
self.conn.record_metering_data(msg)