Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_group_by_user(self):
f = storage.SampleFilter(
meter='instance',
)
results = list(self.conn.get_meter_statistics(f, groupby=['user_id']))
self.assertEqual(len(results), 3)
groupby_list = [r.groupby for r in results]
groupby_keys_set = set(x for sub_dict in groupby_list
for x in sub_dict.keys())
groupby_vals_set = set(x for sub_dict in groupby_list
for x in sub_dict.values())
self.assertEqual(groupby_keys_set, set(['user_id']))
self.assertEqual(groupby_vals_set, set(['user-1', 'user-2', 'user-3']))
for r in results:
if r.groupby == {'user_id': 'user-1'}:
self.assertEqual(r.count, 2)
self.assertEqual(r.unit, 's')
def main(argv):
cfg.CONF([], project='ceilometer')
if os.getenv("CEILOMETER_TEST_STORAGE_URL", "").startswith("hbase://"):
url = ("%s?table_prefix=%s" %
(os.getenv("CEILOMETER_TEST_STORAGE_URL"),
os.getenv("CEILOMETER_TEST_HBASE_TABLE_PREFIX", "test")))
conn = storage.get_connection(url, 'ceilometer.metering.storage')
for arg in argv:
if arg == "--upgrade":
conn.upgrade()
if arg == "--clear":
conn.clear()
def test_clear_metering_data(self):
# NOTE(jd) Override this test in MongoDB because our code doesn't clear
# the collections, this is handled by MongoDB TTL feature.
if self.CONF.database.connection.startswith('mongodb://'):
return
timeutils.utcnow.override_time = datetime.datetime(2012, 7, 2, 10, 45)
self.conn.clear_expired_metering_data(3 * 60)
f = storage.SampleFilter(meter='instance')
results = list(self.conn.get_samples(f))
self.assertEqual(len(results), 5)
results = list(self.conn.get_users())
self.assertEqual(len(results), 5)
results = list(self.conn.get_projects())
self.assertEqual(len(results), 5)
results = list(self.conn.get_resources())
self.assertEqual(len(results), 5)
url = (getattr(conf.database, "metering_connection") or
conf.database.connection)
parsed = urlparse.urlparse(url)
if parsed.password:
masked_netloc = '****'.join(parsed.netloc.rsplit(parsed.password))
masked_url = parsed._replace(netloc=masked_netloc)
masked_url = urlparse.urlunparse(masked_url)
else:
masked_url = url
LOG.info('Starting to drop event, alarm and alarm history tables in '
'backend: %s', masked_url)
connection_scheme = parsed.scheme
conn = storage.get_connection_from_config(conf)
if connection_scheme in ('mysql', 'mysql+pymysql', 'postgresql',
'sqlite'):
engine = conn._engine_facade.get_engine()
meta = sa.MetaData(bind=engine)
for table_name in ('alarm', 'alarm_history',
'trait_text', 'trait_int',
'trait_float', 'trait_datetime',
'event', 'event_type'):
if engine.has_table(table_name):
table = sa.Table(table_name, meta, autoload=True)
table.drop()
LOG.info("Legacy %s table of SQL backend has been "
"dropped.", table_name)
else:
LOG.info('%s table does not exist.', table_name)
def get_all(self, q=None, limit=None):
"""Return samples for the meter.
:param q: Filter rules for the data to be returned.
:param limit: Maximum number of samples to return.
"""
rbac.enforce('get_samples', pecan.request)
q = q or []
if limit and limit < 0:
raise ClientSideError(_("Limit must be positive"))
kwargs = _query_to_kwargs(q, storage.SampleFilter.__init__)
kwargs['meter'] = self.meter_name
f = storage.SampleFilter(**kwargs)
return [OldSample.from_db_model(e)
for e in pecan.request.storage_conn.get_samples(f, limit=limit)
]
if not aggregate:
for aggregation in Connection.STANDARD_AGGREGATES.values():
group_stage.update(
aggregation.group(version_array=self.version)
)
project_stage.update(
aggregation.project(
version_array=self.version
)
)
else:
for description in aggregate:
aggregation = Connection.AGGREGATES.get(description.func)
if aggregation:
if not aggregation.validate(description.param):
raise storage.StorageBadAggregate(
'Bad aggregate: %s.%s' % (description.func,
description.param))
group_stage.update(
aggregation.group(description.param,
version_array=self.version)
)
project_stage.update(
aggregation.project(description.param,
version_array=self.version)
)
def __init__(self, url):
# NOTE(jd) Use our own connection pooling on top of the Pymongo one.
# We need that otherwise we overflow the MongoDB instance with new
# connection since we instantiate a Pymongo client each time someone
# requires a new storage connection.
self.conn = self.CONNECTION_POOL.connect(url)
# Require MongoDB 2.4 to use $setOnInsert
if self.conn.server_info()['versionArray'] < [2, 4]:
raise storage.StorageBadVersion("Need at least MongoDB 2.4")
connection_options = pymongo.uri_parser.parse_uri(url)
self.db = getattr(self.conn, connection_options['database'])
if connection_options.get('username'):
self.db.authenticate(connection_options['username'],
connection_options['password'])
# NOTE(jd) Upgrading is just about creating index, so let's do this
# on connection to be sure at least the TTL is correctly updated if
# needed.
self.upgrade()
def get_connection(conf):
try:
return storage.get_connection_from_config(conf)
except Exception as err:
LOG.exception("Failed to connect to db" "retry later: %s",
err)
def get_meter_statistics(self, sample_filter, period=None, groupby=None,
aggregate=None):
"""Return an iterable of models.Statistics instance.
Items are containing meter statistics described by the query
parameters. The filter must have a meter value set.
"""
# NOTE(zqfan): We already have checked at API level, but
# still leave it here in case of directly storage calls.
if aggregate:
for a in aggregate:
if a.func not in self.AGGREGATES:
msg = _('Invalid aggregation function: %s') % a.func
raise storage.StorageBadAggregate(msg)
if (groupby and set(groupby) -
set(['user_id', 'project_id', 'resource_id', 'source',
'resource_metadata.instance_type'])):
raise ceilometer.NotImplementedError(
"Unable to group by these fields")
q = pymongo_utils.make_query_from_filter(sample_filter)
group_stage = {}
project_stage = {
"unit": "$_id.unit",
"name": "$_id.name",
"first_timestamp": "$first_timestamp",
"last_timestamp": "$last_timestamp",
"period_start": "$_id.period_start",
}
if a.func in STANDARD_AGGREGATES:
functions.append(STANDARD_AGGREGATES[a.func])
elif a.func in UNPARAMETERIZED_AGGREGATES:
functions.append(UNPARAMETERIZED_AGGREGATES[a.func])
elif a.func in PARAMETERIZED_AGGREGATES['compute']:
validate = PARAMETERIZED_AGGREGATES['validate'].get(a.func)
if not (validate and validate(a.param)):
raise storage.StorageBadAggregate('Bad aggregate: %s.%s'
% (a.func, a.param))
compute = PARAMETERIZED_AGGREGATES['compute'][a.func]
functions.append(compute(a.param))
else:
# NOTE(zqfan): We already have checked at API level, but
# still leave it here in case of directly storage calls.
msg = _('Invalid aggregation function: %s') % a.func
raise storage.StorageBadAggregate(msg)
return functions