Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_events(self):
now = datetime.datetime.now()
now_ts = int(time.mktime(now.timetuple()))
now_title = "end test title " + str(now_ts)
now_message = "test message " + str(now_ts)
before_ts = int(time.mktime((now - datetime.timedelta(minutes=5)).timetuple()))
before_title = "start test title " + str(before_ts)
before_message = "test message " + str(before_ts)
now_event = dog.Event.create(
title=now_title, text=now_message, date_happened=now_ts
)
before_event = dog.Event.create(
title=before_title, text=before_message, date_happened=before_ts
)
assert now_event["event"]["title"] == now_title
assert now_event["event"]["text"] == now_message
assert now_event["event"]["date_happened"] == now_ts
assert before_event["event"]["title"] == before_title
assert before_event["event"]["text"] == before_message
assert before_event["event"]["date_happened"] == before_ts
# The returned event doesn"t contain host information, we need to get it separately
event_id = dog.Event.create(
title="test host", text="test host", host=self.host_name
text="test no hostname",
attach_host_name=False,
alert_type="success",
)["event"]["id"]
event = get_with_retry("Event", event_id)
assert not event["event"]["host"]
assert event["event"]["alert_type"] == "success"
event = dog.Event.create(
title="test tags", text="test tags", tags=["test_tag:1", "test_tag:2"]
)
assert "test_tag:1" in event["event"]["tags"]
assert "test_tag:2" in event["event"]["tags"]
now_ts = int(time.mktime(datetime.datetime.now().timetuple()))
event_id = dog.Event.create(
title="test source",
text="test source",
source_type_name="vsphere",
priority="low",
)["event"]["id"]
get_with_retry("Event", event_id)
events = dog.Event.query(
start=now_ts - 100, end=now_ts + 100, priority="low", sources="vsphere"
)
assert events["events"], "No events found in stream"
assert event_id in [event["id"] for event in events["events"]]
'aggregation_key': options.name,
'host': host,
'priority': options.priority or event_priority,
'tags': tags
}
if options.buffer_outs:
if is_p3k():
stderr = stderr.decode('utf-8')
stdout = stdout.decode('utf-8')
print(stderr.strip(), file=sys.stderr)
print(stdout.strip(), file=sys.stdout)
if options.submit_mode == 'all' or returncode != 0:
api.Event.create(title=event_title, text=event_body, **event)
sys.exit(returncode)
from datadog import initialize, api
import time
options = {
'api_key': '',
'app_key': ''
}
initialize(**options)
end_time = time.time()
start_time = end_time - 100
api.Event.query(
start=start_time,
end=end_time,
priority="normal",
tags=["application:web"],
unaggregated=true
)
from datadog import initialize, api
options = {
'api_key': '',
'app_key': ''
}
initialize(**options)
api.Event.get(2603387619536318140)
def _show(cls, args):
api._timeout = args.timeout
format = args.format
res = api.Event.get(args.event_id)
report_warnings(res)
report_errors(res)
if format == 'pretty':
prettyprint_event_details(res['event'])
elif format == 'raw':
print(json.dumps(res))
else:
print_event_details(res['event'])
def create_event(depl, title, text='', tags=[]):
if not depl.datadog_notify: return
initializeDatadog()
try:
api.Event.create(title=title, text=text, tags=tags + [ 'uuid:{}'.format(depl.uuid), 'deployment:{}'.format(depl.name)])
except:
depl.logger.warn('Failed creating event in datadog, ignoring.')
def create_event(self, alert_type):
prefix = ""
if deployment.is_disconnected:
prefix = (
"Cannot detect exact changes, this is only the new latest commit:\n"
)
elif deployment.is_rollback:
prefix = "Rollback:\n"
datadog.api.Event.create(
title="{0} deployment".format(environ["CI_PROJECT_PATH"]),
text=prefix + "\n".join(commit.summary for commit in deployment.commits),
tags=[
"releaser:{0}".format(environ["GITLAB_USER_EMAIL"]),
"project:{0}".format(environ["CI_PROJECT_PATH"]),
"environment:{0}".format(environ["CI_ENVIRONMENT_NAME"]),
],
alert_type=alert_type,
)
from datadog import initialize, api
options = {
'api_key': '9775a026f1ca7d1c6c5af9d94d9595a4',
'app_key': '87ce4a24b5553d2e482ea8a8500e71b8ad4554ff'
}
initialize(**options)
title = "Something big happened!"
text = 'And let me tell you all about it here!'
tags = ['version:1', 'application:web']
api.Event.create(title=title, text=text, tags=tags)
def _send_event(self, title, alert_type=None, text=None, tags=None, host=None, event_type=None, event_object=None):
if tags is None:
tags = []
tags.extend(self.default_tags)
priority = 'normal' if alert_type == 'error' else 'low'
try:
datadog.api.Event.create(
title=title,
text=text,
alert_type=alert_type,
priority=priority,
tags=tags,
host=host,
source_type_name='ansible',
event_type=event_type,
event_object=event_object,
)
except Exception as e:
# We don't want Ansible to fail on an API error
print('Couldn\'t send event "{0}" to Datadog'.format(title))
print(e)