Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_metric_submit_query_switch(self):
"""
Endpoints are different for submission and queries.
"""
Metric.send(points=(123, 456))
self.request_called_with('POST', API_HOST + "/api/v1/series",
data={'series': [{'points': [[123, 456.0]], 'host': api._host_name}]})
Metric.query(start="val1", end="val2")
self.request_called_with('GET', API_HOST + "/api/v1/query",
params={'from': "val1", 'to': "val2"})
from datadog import initialize, api
from time import time
from json import dump
options = {'api_key': '',
'app_key': ''}
initialize(**options)
end = int(time()) # Specify the time period over which you want to fetch the data in seconds
start = end - 3600
query = 'system.cpu.idle{*}' # Enter the metric you want, see your list here: https://app.datadoghq.com/metric/summary.
# Optionally, enter your host to filter the data, see here: https://app.datadoghq.com/infrastructure
results = api.Metric.query(start=start - 3600, end=end, query=query)
with open("output.json", "w") as f:
dump(results, f)
# This creates a file named output.json in the current folder
from_seconds_ago,
to_seconds_ago):
"""
Queries datadog for a specific metric, potentially with some
function applied to it and returns the results.
:param query: The datadog query to execute (see datadog docs)
:type query: str
:param from_seconds_ago: How many seconds ago to start querying for.
:type from_seconds_ago: int
:param to_seconds_ago: Up to how many seconds ago to query for.
:type to_seconds_ago: int
"""
now = int(time.time())
response = api.Metric.query(
start=now - from_seconds_ago,
end=now - to_seconds_ago,
query=query)
self.validate_response(response)
return response
def get_series(self, start, end, query):
"""Get time series points.
Args:
start (int): Unix timestamp.
end (int): Unix timestamp.
query (string): Datadog query.
"""
j = api.Metric.query(start=start, end=end, query=query)
if 'errors' in j:
msg = 'Datadog: %s' % j['errors']
raise RuntimeError(msg)
if 'status' in j and j['status'] != 'ok':
msg = 'Datadog: API status was NOT ok: %s' % j['status']
raise RuntimeError(msg)
series = []
for d in j['series']:
sre = re.search(',?host:([^,\}]+)', d['scope'])
host = '*' if sre is None else sre.group(1)
# p = [ timestamp, value ]
series += [{'src_metric': d['metric'],
def check_datadog(objective: models.Objective, query_datetime: arrow.Arrow) -> float:
response = api.Metric.query(
start=query_datetime.shift(days=-30).timestamp,
end=query_datetime.timestamp,
query=objective.indicator_query,
)
series = response["series"][0]
point = series["pointlist"][0]
_, point_value = point
return round(Decimal(point_value), 4)
#(vim) !wq
#crontab -l
#########################################################################
from datadog import initialize, api
import time
options = {
'api_key': '149***e4',
'app_key': 'f2e***a5'
}
initialize(**options)
now = int(time.time())
query = 'system.cpu.system{*}' # Modify the metric you want to track the freshness of
metric = api.Metric.query(start=now - 3600, end=now, query=query) # how far back in time you want to go (the default is one hour)
nb_points = len(metric['series'][0]['pointlist']) - 1
last_submission = int(metric['series'][0]['pointlist'][nb_points][0]) / 1000
freshness = now - last_submission
print "freshness is: ", freshness
api.Metric.send(metric='freshness', points=freshness, tags=["metric:" + str(query)])
def get_metrics(auth_key, metrics):
options = {
'api_key': auth_key['api_key'],
'app_key': auth_key['app_key']
}
initialize(**options)
end = int(time.time())
start = end - 60
results = []
for metric in metrics:
result = api.Metric.query(start=start, end=end, query=metric['metric'])
results.append(result)
return results