Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_one_resource(self):
f = storage.SampleFilter(
user='user-id',
meter='volume.size',
)
results = list(self.conn.get_meter_statistics(f))[0]
self.assertEqual(results.duration,
(datetime.datetime(2012, 9, 25, 12, 32)
- datetime.datetime(2012, 9, 25, 10, 30)).seconds)
self.assertEqual(results.count, 3)
self.assertEqual(results.unit, 'GiB')
self.assertEqual(results.min, 5)
self.assertEqual(results.max, 7)
self.assertEqual(results.sum, 18)
self.assertEqual(results.avg, 6)
def test_group_by_with_query_filter_multiple(self):
f = storage.SampleFilter(
meter='instance',
user='user-2',
source='source-1',
)
results = list(self.conn.get_meter_statistics(
f,
groupby=['project_id', 'resource_id']))
self.assertEqual(len(results), 3)
groupby_list = [r.groupby for r in results]
groupby_keys_set = set(x for sub_dict in groupby_list
for x in sub_dict.keys())
groupby_vals_set = set(x for sub_dict in groupby_list
for x in sub_dict.values())
self.assertEqual(groupby_keys_set, set(['project_id', 'resource_id']))
self.assertEqual(groupby_vals_set, set(['project-1', 'project-2',
'resource-1', 'resource-2']))
def test_group_by_resource(self):
f = storage.SampleFilter(
meter='instance',
)
results = list(self.conn.get_meter_statistics(f,
groupby=['resource_id']))
self.assertEqual(len(results), 3)
groupby_list = [r.groupby for r in results]
groupby_keys_set = set(x for sub_dict in groupby_list
for x in sub_dict.keys())
groupby_vals_set = set(x for sub_dict in groupby_list
for x in sub_dict.values())
self.assertEqual(groupby_keys_set, set(['resource_id']))
self.assertEqual(groupby_vals_set, set(['resource-1',
'resource-2',
'resource-3']))
for r in results:
if r.groupby == {'resource_id': 'resource-1'}:
def test_get_samples_by_end_time(self):
timestamp = datetime.datetime(2012, 7, 2, 10, 40)
f = storage.SampleFilter(
user='user-id',
end=timestamp,
)
results = list(self.conn.get_samples(f))
self.assertEqual(len(results), 1)
f.end_timestamp_op = 'lt'
results = list(self.conn.get_samples(f))
self.assertEqual(len(results), 1)
f.end_timestamp_op = 'le'
results = list(self.conn.get_samples(f))
self.assertEqual(len(results), 2)
self.assertEqual(results[1].timestamp,
datetime.datetime(2012, 7, 2, 10, 39))
def test_by_project(self):
f = storage.SampleFilter(
meter='volume.size',
resource='resource-id',
start='2012-09-25T11:30:00',
end='2012-09-25T11:32:00',
)
results = list(self.conn.get_meter_statistics(f))[0]
self.assertEqual(results.duration, 0)
self.assertEqual(results.count, 1)
self.assertEqual(results.unit, 'GiB')
self.assertEqual(results.min, 6)
self.assertEqual(results.max, 6)
self.assertEqual(results.sum, 6)
self.assertEqual(results.avg, 6)
def test_get_samples_by_metaquery(self):
q = {'metadata.display_name': 'test-server'}
f = storage.SampleFilter(metaquery=q)
results = list(self.conn.get_samples(f))
assert results
for meter in results:
self.assertIn(meter.as_dict(), self.msgs)
def _list_samples(meter,
project=None,
resource=None,
source=None,
user=None):
"""Return a list of raw samples.
Note: the API talks about "events" these are equivalent to samples.
but we still need to return the samples within the "events" dict
to maintain API compatibilty.
"""
q_ts = _get_query_timestamps(flask.request.args)
f = storage.SampleFilter(
user=user,
project=project,
source=source,
meter=meter,
resource=resource,
start=q_ts['start_timestamp'],
end=q_ts['end_timestamp'],
metaquery=_get_metaquery(flask.request.args),
)
samples = flask.request.storage_conn.get_samples(f)
jsonified = flask.jsonify(events=[s.as_dict() for s in samples])
if request_wants_html():
return flask.templating.render_template('list_event.html',
user=user,
project=project,
def show_resources(db, args):
if args:
users = args
else:
users = sorted(db.get_users())
for u in users:
print(u)
for resource in db.get_resources(user=u):
print(' %(resource_id)s %(timestamp)s' % resource)
for k, v in sorted(six.iteritems(resource['metadata'])):
print(' %-10s : %s' % (k, v))
for meter in resource['meter']:
totals = db.get_statistics(storage.SampleFilter(
user=u,
meter=meter['counter_name'],
resource=resource['resource_id'],
))
# FIXME(dhellmann): Need a way to tell whether to use
# max() or sum() by meter name without hard-coding.
if meter['counter_name'] in ['cpu', 'disk']:
value = totals[0]['max']
else:
value = totals[0]['sum']
print(' %s (%s): %s' % \
(meter['counter_name'], meter['counter_type'],
value))
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery=None, limit=None, unique=False):
"""Return an iterable of api_models.Meter instances
:param user: Optional ID for user that owns the resource.
:param project: Optional ID for project that owns the resource.
:param resource: Optional ID of the resource.
:param source: Optional source filter.
:param metaquery: Optional dict with metadata to match on.
:param limit: Maximum number of results to return.
:param unique: If set to true, return only unique meter information.
"""
if limit == 0:
return
s_filter = storage.SampleFilter(user=user,
project=project,
source=source,
metaquery=metaquery,
resource=resource)
# NOTE(gordc): get latest sample of each meter/resource. we do not
# filter here as we want to filter only on latest record.
session = self._engine_facade.get_session()
subq = session.query(func.max(models.Sample.id).label('id')).join(
models.Resource,
models.Resource.internal_id == models.Sample.resource_id)
if unique:
subq = subq.group_by(models.Sample.meter_id)
else: