Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _aggregated_field_for_split(cls, aggregation, key, version=3,
granularity=None):
path = cls.FIELD_SEP.join([
str(key), aggregation,
str(utils.timespan_total_seconds(granularity or key.sampling))])
return path + '_v%s' % version if version else path
def _object_name(split_key, aggregation, version=3):
name = '%s_%s_%s' % (
split_key, aggregation,
utils.timespan_total_seconds(split_key.sampling),
)
return name + '_v%s' % version if version else name
def _get_object_name(metric, key, aggregation, version=3):
name = str("gnocchi_%s_%s_%s_%s" % (
metric.id, key, aggregation,
utils.timespan_total_seconds(key.sampling)),
)
return name + '_v%s' % version if version else name
def retrieve_data(storage_obj, metric, start, stop, window):
"""Retrieves finest-res data available from storage."""
window_seconds = utils.timespan_total_seconds(window)
try:
min_grain = min(
ap.granularity for ap in metric.archive_policy.definition
if (window_seconds % utils.timespan_total_seconds(
ap.granularity) == 0))
except ValueError:
msg = ("No data available that is either full-res or "
"of a granularity that factors into the window size "
"you specified.")
raise deprecated_aggregates.CustomAggFailure(msg)
data = list(zip(*storage_obj.get_measures(metric, start, stop,
granularity=min_grain)))
return (min_grain,
pandas.Series(data[2], data[0]) if data else pandas.Series())
def _object_name(split_key, aggregation, version=3):
name = '%s_%s_%s' % (
aggregation,
utils.timespan_total_seconds(split_key.sampling),
split_key,
)
return name + '_v%s' % version if version else name
def _handle_binary_op(cls, engine, table, op, nodes):
try:
field_name, value = list(nodes.items())[0]
except Exception:
raise indexer.QueryError()
if field_name == "lifespan":
attr = getattr(table, "ended_at") - getattr(table, "started_at")
value = datetime.timedelta(
seconds=utils.timespan_total_seconds(
utils.to_timespan(value)))
if engine == "mysql":
# NOTE(jd) So subtracting 2 timestamps in MySQL result in some
# weird results based on string comparison. It's useless and it
# does not work at all with seconds or anything. Just skip it.
raise exceptions.NotImplementedError
elif field_name == "created_by_user_id":
creator = getattr(table, "creator")
if op == operator.eq:
return creator.like("%s:%%" % value)
elif op == operator.ne:
return sqlalchemy.not_(creator.like("%s:%%" % value))
elif op == cls.binary_operators[u"like"]:
return creator.like("%s:%%" % value)
raise indexer.QueryValueError(value, field_name)
elif field_name == "created_by_project_id":
def serialize(self):
return {
'timespan': None
if self.timespan is None
else float(utils.timespan_total_seconds(self.timespan)),
'granularity': float(
utils.timespan_total_seconds(self.granularity)),
'points': self.points,
}