Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# NOTE(sileht): The oslo.config interpolation is buggy when the value
# is None, this replaces it by the expected empty string.
# Fix will perhaps be fixed by https://review.openstack.org/#/c/417496/
# But it seems some projects are relaying on the bug...
class CustomStrSubWrapper(cfg.ConfigOpts.StrSubWrapper):
def __getitem__(self, key):
value = super(CustomStrSubWrapper, self).__getitem__(key)
if value is None:
return ''
return value
cfg.ConfigOpts.StrSubWrapper = CustomStrSubWrapper
_STORAGE_OPTS = list(itertools.chain(gnocchi.storage.OPTS,
gnocchi.storage.ceph.OPTS,
gnocchi.storage.file.OPTS,
gnocchi.storage.swift.OPTS,
gnocchi.common.redis.OPTS,
gnocchi.storage.s3.OPTS))
_INCOMING_OPTS = copy.deepcopy(_STORAGE_OPTS)
for opt in _INCOMING_OPTS:
opt.default = '${storage.%s}' % opt.name
API_OPTS = (
cfg.HostAddressOpt('host',
default="0.0.0.0",
help="Host to listen on"),
with self.BACKEND_LOCKS[name]:
# Recheck, maybe it have been created in the meantime.
if name not in self.backends:
if name == "coordinator":
# NOTE(jd) This coordinator is never stop. I don't
# think it's a real problem since the Web app can never
# really be stopped anyway, except by quitting it
# entirely.
self.backends[name] = (
metricd.get_coordinator_and_start(
str(uuid.uuid4()).encode(),
self.conf.coordination_url)
)
elif name == "storage":
self.backends[name] = (
gnocchi_storage.get_driver(self.conf)
)
elif name == "incoming":
self.backends[name] = (
gnocchi_incoming.get_driver(self.conf)
)
elif name == "indexer":
self.backends[name] = (
gnocchi_indexer.get_driver(self.conf)
)
else:
raise RuntimeError("Unknown driver %s" % name)
return self.backends[name]
def _get_measures_timeserie(self, metric,
aggregation, granularity,
from_timestamp=None, to_timestamp=None):
# Find the number of point
for d in metric.archive_policy.definition:
if d.granularity == granularity:
points = d.points
break
else:
raise storage.GranularityDoesNotExist(metric, granularity)
all_keys = None
try:
all_keys = self._list_split_keys_for_metric(
metric, aggregation, granularity)
except storage.MetricDoesNotExist:
for d in metric.archive_policy.definition:
if d.granularity == granularity:
return carbonara.AggregatedTimeSerie(
sampling=granularity,
aggregation_method=aggregation,
max_size=d.points)
raise storage.GranularityDoesNotExist(metric, granularity)
if from_timestamp:
from_timestamp = carbonara.SplitKey.from_timestamp_and_sampling(
def _configure(self):
member_id = (
"%s.%s.%s" % (socket.gethostname(),
self.worker_id,
# NOTE(jd) Still use a uuid here so we're
# sure there's no conflict in case of
# crash/restart
str(uuid.uuid4()))
).encode()
self.coord = get_coordinator_and_start(member_id,
self.conf.coordination_url)
self.store = storage.get_driver(self.conf)
self.incoming = incoming.get_driver(self.conf)
self.index = indexer.get_driver(self.conf)
self.chef = chef.Chef(self.coord, self.incoming,
self.index, self.store)
if aggregation not in self.metric.archive_policy.aggregation_methods:
abort(404, {
"cause": "Aggregation method does not exist for this metric",
"detail": {
"metric": self.metric.id,
"aggregation_method": aggregation,
},
})
aggregations = []
for g in sorted(granularity, reverse=True):
agg = self.metric.archive_policy.get_aggregation(
aggregation, g)
if agg is None:
abort(404, six.text_type(
storage.AggregationDoesNotExist(
self.metric, aggregation, g)
))
aggregations.append(agg)
if (strtobool("refresh", refresh) and
pecan.request.incoming.has_unprocessed(self.metric.id)):
try:
pecan.request.chef.refresh_metrics(
[self.metric],
pecan.request.conf.api.operation_timeout)
except chef.SackAlreadyLocked:
abort(503, 'Unable to refresh metric: %s. Metric is locked. '
'Please try again.' % self.metric.id)
try:
results = pecan.request.storage.get_aggregated_measures(
{self.metric: aggregations},
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import six
from gnocchi import carbonara
from gnocchi.common import redis
from gnocchi import storage
from gnocchi import utils
class RedisStorage(storage.StorageDriver):
WRITE_FULL = True
STORAGE_PREFIX = b"timeseries"
FIELD_SEP = '_'
FIELD_SEP_B = b'_'
_SCRIPTS = {
"list_split_keys": """
local metric_key = KEYS[1]
local ids = {}
local cursor = 0
local substring = "([^%s]*)%s([^%s]*)%s([^%s]*)"
repeat
local result = redis.call("HSCAN", metric_key, cursor, "MATCH", ARGV[1])
cursor = tonumber(result[1])
for i, v in ipairs(result[2]) do
help="Skip index upgrade."),
cfg.BoolOpt("skip-storage", default=False,
help="Skip storage upgrade."),
cfg.BoolOpt("skip-incoming", default=False,
help="Skip incoming storage upgrade."),
cfg.BoolOpt("skip-archive-policies-creation", default=False,
help="Skip default archive policies creation."),
sack_number_opt,
])
conf = service.prepare_service(conf=conf, log_to_std=True)
if not conf.skip_index:
index = indexer.get_driver(conf)
LOG.info("Upgrading indexer %s", index)
index.upgrade()
if not conf.skip_storage:
s = storage.get_driver(conf)
LOG.info("Upgrading storage %s", s)
s.upgrade()
if not conf.skip_incoming:
i = incoming.get_driver(conf)
LOG.info("Upgrading incoming storage %s", i)
i.upgrade(conf.sacks_number)
if (not conf.skip_archive_policies_creation
and not index.list_archive_policies()
and not index.list_archive_policy_rules()):
if conf.skip_index:
index = indexer.get_driver(conf)
for name, ap in six.iteritems(archive_policy.DEFAULT_ARCHIVE_POLICIES):
index.create_archive_policy(ap)
index.create_archive_policy_rule("default", "*", "low")
def metricd_tester(conf):
# NOTE(sileht): This method is designed to be profiled, we
# want to avoid issues with profiler and os.fork(), that
# why we don't use the MetricdServiceManager.
index = indexer.get_driver(conf)
s = storage.get_driver(conf)
inc = incoming.get_driver(conf)
c = chef.Chef(None, inc, index, s)
metrics_count = 0
for sack in inc.iter_sacks():
try:
metrics_count += c.process_new_measures_for_sack(s, True)
except chef.SackAlreadyLocked:
continue
if metrics_count >= conf.stop_after_processing_metrics:
break
def bulk_delete(conn, container, objects):
objects = [quote(('/%s/%s' % (container, obj['name'])).encode('utf-8'))
for obj in objects]
resp = {}
headers, body = conn.post_account(
headers=POST_HEADERS, query_string='bulk-delete',
data=b''.join(obj.encode('utf-8') + b'\n' for obj in objects),
response_dict=resp)
if resp['status'] != 200:
raise storage.StorageError(
"Unable to bulk-delete, is bulk-delete enabled in Swift?")
resp = swift_utils.parse_api_response(headers, body)
LOG.debug('# of objects deleted: %s, # of objects skipped: %s',
resp['Number Deleted'], resp['Number Not Found'])
if not granularity:
abort(400, 'A granularity must be specified to resample')
try:
resample = utils.to_timespan(resample)
except ValueError as e:
abort(400, e)
transform = [carbonara.Transformation("resample", (resample,))]
if (strtobool("refresh", refresh) and
pecan.request.incoming.has_unprocessed(self.metric)):
try:
pecan.request.storage.refresh_metric(
pecan.request.indexer, pecan.request.incoming, self.metric,
pecan.request.conf.api.refresh_timeout)
except storage.SackLockTimeoutError as e:
abort(503, e)
try:
if aggregation in self.custom_agg:
warnings.warn("moving_average aggregation is deprecated.",
category=DeprecationWarning)
return self.custom_agg[aggregation].compute(
pecan.request.storage, self.metric,
start, stop, **param)
return pecan.request.storage.get_measures(
self.metric, start, stop, aggregation,
utils.to_timespan(granularity)
if granularity is not None else None,
transform)
except (storage.MetricDoesNotExist,
storage.GranularityDoesNotExist,
storage.AggregationDoesNotExist) as e: