Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.backends[name] = (
metricd.get_coordinator_and_start(
str(uuid.uuid4()).encode(),
self.conf.coordination_url)
)
elif name == "storage":
self.backends[name] = (
gnocchi_storage.get_driver(self.conf)
)
elif name == "incoming":
self.backends[name] = (
gnocchi_incoming.get_driver(self.conf)
)
elif name == "indexer":
self.backends[name] = (
gnocchi_indexer.get_driver(self.conf)
)
else:
raise RuntimeError("Unknown driver %s" % name)
return self.backends[name]
sack_number_opt = copy.copy(_SACK_NUMBER_OPT)
sack_number_opt.default = 128
conf.register_cli_opts([
cfg.BoolOpt("skip-index", default=False,
help="Skip index upgrade."),
cfg.BoolOpt("skip-storage", default=False,
help="Skip storage upgrade."),
cfg.BoolOpt("skip-incoming", default=False,
help="Skip incoming storage upgrade."),
cfg.BoolOpt("skip-archive-policies-creation", default=False,
help="Skip default archive policies creation."),
sack_number_opt,
])
conf = service.prepare_service(conf=conf, log_to_std=True)
if not conf.skip_index:
index = indexer.get_driver(conf)
LOG.info("Upgrading indexer %s", index)
index.upgrade()
if not conf.skip_storage:
s = storage.get_driver(conf)
LOG.info("Upgrading storage %s", s)
s.upgrade()
if not conf.skip_incoming:
i = incoming.get_driver(conf)
LOG.info("Upgrading incoming storage %s", i)
i.upgrade(conf.sacks_number)
if (not conf.skip_archive_policies_creation
and not index.list_archive_policies()
and not index.list_archive_policy_rules()):
if conf.skip_index:
index = indexer.get_driver(conf)
LOG.info("Upgrading indexer %s", index)
index.upgrade()
if not conf.skip_storage:
s = storage.get_driver(conf)
LOG.info("Upgrading storage %s", s)
s.upgrade()
if not conf.skip_incoming:
i = incoming.get_driver(conf)
LOG.info("Upgrading incoming storage %s", i)
i.upgrade(conf.sacks_number)
if (not conf.skip_archive_policies_creation
and not index.list_archive_policies()
and not index.list_archive_policy_rules()):
if conf.skip_index:
index = indexer.get_driver(conf)
for name, ap in six.iteritems(archive_policy.DEFAULT_ARCHIVE_POLICIES):
index.create_archive_policy(ap)
index.create_archive_policy_rule("default", "*", "low")
def _configure(self):
member_id = (
"%s.%s.%s" % (socket.gethostname(),
self.worker_id,
# NOTE(jd) Still use a uuid here so we're
# sure there's no conflict in case of
# crash/restart
str(uuid.uuid4()))
).encode()
self.coord = get_coordinator_and_start(member_id,
self.conf.coordination_url)
self.store = storage.get_driver(self.conf)
self.incoming = incoming.get_driver(self.conf)
self.index = indexer.get_driver(self.conf)
self.chef = chef.Chef(self.coord, self.incoming,
self.index, self.store)
def injector():
conf = cfg.ConfigOpts()
conf.register_cli_opts([
cfg.IntOpt("metrics", default=1, min=1),
cfg.StrOpt("archive-policy-name", default="low"),
cfg.StrOpt("creator", default="admin"),
cfg.IntOpt("batch-of-measures", default=1000),
cfg.IntOpt("measures-per-batch", default=10),
])
conf = service.prepare_service(conf=conf)
index = indexer.get_driver(conf)
instore = incoming.get_driver(conf)
def todo():
metric = index.create_metric(
uuid.uuid4(),
creator=conf.creator,
archive_policy_name=conf.archive_policy_name)
for _ in six.moves.range(conf.batch_of_measures):
measures = [
incoming.Measure(
utils.dt_in_unix_ns(utils.utcnow()), random.random())
for __ in six.moves.range(conf.measures_per_batch)]
instore.add_measures(metric, measures)
with futures.ThreadPoolExecutor(max_workers=conf.metrics) as executor:
def metricd_tester(conf):
# NOTE(sileht): This method is designed to be profiled, we
# want to avoid issues with profiler and os.fork(), that
# why we don't use the MetricdServiceManager.
index = indexer.get_driver(conf)
s = storage.get_driver(conf)
inc = incoming.get_driver(conf)
c = chef.Chef(None, inc, index, s)
metrics_count = 0
for sack in inc.iter_sacks():
try:
metrics_count += c.process_new_measures_for_sack(s, True)
except chef.SackAlreadyLocked:
continue
if metrics_count >= conf.stop_after_processing_metrics:
break