Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param error: exception object that is the result of metric data collection
or (possibly pending) resource status check
"""
# Quarantine the metric temporarily
self._metricInfoCache[metricObj.uid].quarantineEndTime = (
startTime +
metricObj.poll_interval * self._METRIC_QUARANTINE_DURATION_RATIO)
try:
if metricObj.collector_error is None:
# TODO: unit-test
# Begin error grace period for this metric
deadline = time.time() + self._metricErrorGracePeriod
with engine.connect() as conn:
repository.retryOnTransientErrors(repository.setMetricCollectorError)(
conn,
metricObj.uid,
utils.jsonEncode(dict(deadline=deadline,
message=repr(error))))
self._log.error(
"Started error grace period on metric=<%r> through %sZ due to "
"error=%r",
metricObj, datetime.utcfromtimestamp(deadline).isoformat(), error)
elif (time.time() >
utils.jsonDecode(metricObj.collector_error)["deadline"]):
# TODO: unit-test
# Error grace period expired: promote the metric to ERROR state
with engine.connect() as conn:
repository.retryOnTransientErrors(repository.setMetricStatus)(
conn, metricObj.uid, MetricStatus.ERROR, repr(error))
self._log.error(
with engine.connect() as conn:
repository.retryOnTransientErrors(repository.setMetricCollectorError)(
conn,
metricObj.uid,
utils.jsonEncode(dict(deadline=deadline,
message=repr(error))))
self._log.error(
"Started error grace period on metric=<%r> through %sZ due to "
"error=%r",
metricObj, datetime.utcfromtimestamp(deadline).isoformat(), error)
elif (time.time() >
utils.jsonDecode(metricObj.collector_error)["deadline"]):
# TODO: unit-test
# Error grace period expired: promote the metric to ERROR state
with engine.connect() as conn:
repository.retryOnTransientErrors(repository.setMetricStatus)(
conn, metricObj.uid, MetricStatus.ERROR, repr(error))
self._log.error(
"Metric Collector: grace period expired; placed metric=<%r> in "
"ERROR state due to error=%r", metricObj, error)
except app_exceptions.ObjectNotFoundError:
# TODO: unit-test
self._log.warning("Metric deleted? metric=%r", metricObj, exc_info=True)
# Metric data or resource status collection error
numErrors += 1
# Update metric error grace period and quarantine info
self._handleMetricCollectionError(engine,
metricObj,
startTime=collectResult.creationTime,
error=error)
else:
if metricObj.collector_error is not None:
oldErrorInfo = metricObj.collector_error
# There was pending collector error before, but no error this time, so
# clear metric's collector_error
try:
with engine.connect() as conn:
repository.retryOnTransientErrors(
repository.setMetricCollectorError)(conn, metricObj.uid, None)
except app_exceptions.ObjectNotFoundError:
self._log.warning("Metric deleted?", exc_info=True)
self._log.info("metric=<%r> exited error grace state %s",
metricObj, oldErrorInfo)
return (numEmpty, numErrors)
request = requests[metricCollection.refID]
metricObj = request.metric
data = None
if metricCollection.slices:
aggregationFn = getAggregationFn(metricObj)
if aggregationFn:
data = aggregate(metricCollection.slices,
aggregationFn=aggregationFn)
else:
data = aggregate(metricCollection.slices)
try:
with engine.connect() as conn:
repository.retryOnTransientErrors(repository.setMetricLastTimestamp)(
conn, metricObj.uid, metricCollection.nextMetricTime)
except ObjectNotFoundError:
self._log.warning("Processing autostack data collection results for "
"unknown model=%s (model deleted?)", metricObj.uid)
continue
if data:
try:
self.metricStreamer.streamMetricData(data,
metricID=metricObj.uid,
modelSwapper=modelSwapper)
except ObjectNotFoundError:
# We expect that the model exists but in the odd case that it has
# already been deleted we don't want to crash the process.
self._log.info("Metric not found when adding data. metric=%s" %
metricObj.uid)
continue # Skip old
if row["anomaly"] >= settingObj.sensitivity:
# First let's clear any old users out of the database.
with engine.connect() as conn:
repository.retryOnTransientErrors(
repository.deleteStaleNotificationDevices)(
conn, _NOTIFICATION_DEVICE_STALE_DAYS)
# If anomaly_score meets or exceeds any of the device
# notification sensitivity settings, trigger notification.
# repository.addNotification() will handle throttling.
notificationId = str(uuid.uuid4())
with engine.connect() as conn:
result = repository.retryOnTransientErrors(
repository.addNotification)(conn,
uid=notificationId,
server=resource,
metric=metricId,
rowid=row["rowid"],
device=settingObj.uid,
windowsize=(
settingObj.windowsize),
timestamp=rowDatetime,
acknowledged=0,
seen=0)
self._log.info("NOTIFICATION=%s SERVER=%s METRICID=%s DEVICE=%s "
"Notification generated. " % (notificationId,
resource, metricId,
settingObj.uid))
def _getCandidateMetrics(self, engine):
""" Return the metrics that are due for an update
:param engine: SQLAlchemy engine object
:type engine: sqlalchemy.engine.Engine
:returns: a (possibly empty) sequence of Metric instances that are due for
an update (after removing quarantined indexes)
TODO: unit-test
"""
with engine.connect() as conn:
metricsToUpdate = repository.retryOnTransientErrors(
repository.getCloudwatchMetricsPendingDataCollection)(conn)
# Remove quarantined metrics
quarantinedIndexes = None
if metricsToUpdate:
now = time.time()
quarantinedIndexes = set(
i for i, m in enumerate(metricsToUpdate)
if now < self._metricInfoCache[m.uid].quarantineEndTime)
metricsToUpdate = OrderedDict((m.uid, m)
for i, m in enumerate(metricsToUpdate)
if i not in quarantinedIndexes)
if not metricsToUpdate:
# TODO: unit-test
return
grok.app.config.loadConfig() # reload config on every batch
engine = repository.engineFactory()
# Cache minimum threshold to trigger any notification to avoid permuting
# settings x metricDataRows
try:
try:
batch = AnomalyService.deserializeModelResult(message.body)
except Exception:
self._log.exception("Error deserializing model result")
raise
# Load all settings for all users (once per incoming batch)
with engine.connect() as conn:
settings = repository.retryOnTransientErrors(
repository.getAllNotificationSettings)(conn)
self._log.debug("settings: %r" % settings)
if settings:
minThreshold = min(setting.sensitivity for setting in settings)
else:
minThreshold = 0.99999
metricInfo = batch["metric"]
metricId = metricInfo["uid"]
resource = metricInfo["resource"]
for row in batch["results"]: