Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_simple(self):
"""Test __init__."""
p = static.StaticPlugin()
self.assertEquals(p.short_name, 'static')
s = base.Status('test', 2, 'test')
p = static.StaticPlugin({'statuses': [s]})
self.assertEquals(p.statuses(), {s['id']: s})
def test_run_update_status(self):
out = StringIO()
self.addCleanup(out.close)
status = base.Status('Test status', 5, 'http://foo/#5')
with self.components_with_plugin(status):
management.call_command('runplugins', stdout=out)
status.description = 'status description'
management.call_command('runplugins', stdout=out)
self.assertIn("Running test Production:Fake plugin", out.getvalue())
self.assertIn("Updated Fake plugin:Test status", out.getvalue())
status_model = models.Status.objects.all()[0]
self.assertEqual(status_model.description, status['description'])
def test_basic(self):
"""Test that we can create statuses."""
s = base.Status('Test status', 5, 'http://github.com/criteo/defcon',
description='This is a test')
expected = {
'defcon': 5, 'title': 'Test status',
'description': 'This is a test',
'link': 'http://github.com/criteo/defcon',
'id': uuid.UUID('235c445a-cd6f-5f46-91af-bada596275a6'),
'time_start': None,
'time_end': None,
}
self.assertEqual(dict(s), expected)
def test_plugin(self):
"""Test that we can build plugins."""
class _FakePlugin(base.Plugin):
"""Fake class for testing."""
def __init__(self, settings):
"""Fake __init__."""
super(_FakePlugin, self).__init__(settings)
@property
def short_name(self):
return 'fake'
@property
def name(self):
return 'fake'
def statuses(self):
"""Fake statuses."""
logging.debug('alert is inactive')
return None
# New API
if status.get('state', 'active') != 'active':
logging.debug('alert is inactived: %s' % status)
return None
# Guess the level.
if type(self.defcon) == int:
defcon = self.defcon
elif callable(self.defcon):
defcon = self.defcon(alert)
else:
defcon = int(alert['labels'][self.defcon])
status = base.Status(
self.render(self.title_template, alert),
defcon,
self.render(self.link_template, alert),
description=self.render(self.description_template, alert),
time_start=dateparse.parse_datetime(alert['startsAt']),
time_end=dateparse.parse_datetime(alert['endsAt']),
)
return status
def _to_status(self, payload, url):
"""Return a status or None."""
logging.debug('Handling %s' % (url))
# if endpoint doesn't have all the infos
# default message will be send
status = base.Status(
title=payload.get('name', "name not found from {}".format(url)),
link=payload.get('link', "link not from {}".format(url)),
defcon=payload.get('defcon', 5),
description=payload.get('description', "description not found from {}".format(url)),
)
return status
from defcon.plugins import base
SECRET_KEY = 'cepowqjcenwqcnewqoinwqowq'
DEBUG = True
ALERTMANAGER_URL = 'http://demo.robustperception.io:9093/api/v1/'
PLUGINS_PRODUCTION = [
# Some static statuses.
{
'plugin': 'static',
'name': 'static test',
'config': {
'statuses': [
base.Status('Test status', 5, 'http://foo/#5'),
base.Status('Other test', 2, 'http://bar/#1')
]
}
},
# For a specific job.
{
'plugin': 'alertmanager',
'name': 'alertmanager-labels',
'config': {
'api': ALERTMANAGER_URL,
'labels': {'job': 'prometheus'},
'defcon': 2,
}
},
# For a specific receiver.
{
def _to_status(self, issue):
"""Return a status or None."""
logging.debug('Handling %s' % (issue.fields.summary))
# Guess the level.
if callable(self.defcon):
defcon = self.defcon(issue)
else:
defcon = self.defcon
data = dict(issue.raw)
data['me'] = data['self']
data['issue'] = issue
data['permalink'] = issue.permalink()
del data['self']
status = base.Status(
self.render(self.title_template, data),
defcon,
self.render(self.link_template, data),
description=self.render(self.description_template, data),
# TODO: use some custom fields for dates.
# time_start=dateparse.parse_datetime(alert['startsAt']),
# time_end=dateparse.parse_datetime(alert['endsAt']),
)
return status
"""DefCon Static plugin."""
from defcon.plugins import base
class StaticPlugin(base.Plugin):
"""DefCon Static plugin.
Config:
* statuses: dict(uuid -> status)
Example:
```python
{
'statuses': {
'a4ce3a48-d3bc-4474-aaf3-52db3d3213f8' : {
'defcon': 5,
'title': 'Test status',
'description': 'This is a test status.',
'metadata': '{"foo": "bar"}',
'link': 'http://githun.com/criteo/defcon',
'time_start': '2017-04-18T08:24:21.920695Z',