Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_missing_config_raises_configuration_error(exists):
with pytest.raises(ConfigurationError):
settings()
def _find_form(self, conf):
"""
Find elements defined in conf['form'].
Render all Jinja2 templates from field['value'].
Save all field['click'] triggers.
If all found, return them as a list of dictionaries.
If at least one was not found, return empty list.
"""
form = conf.get('form', [])
fields = []
creds = settings().creds
first_field = True
for field in form:
click = field.get('click')
text = self._parse_field_text(field, conf, creds)
selector_type, selector = self._parse_field_selector(field)
if selector:
element = self._find_element(
selector,
selector_type,
check_displayed=first_field,
)
if element:
fields.append({
'element': element,
'text': text,
'click': click,
def _exec_scenario(self, code, conf, elements):
logger.info("Executing custom scenario")
logger.debug(code)
exec(
code,
{
'conf': conf,
'creds': settings().creds,
'driver': self.driver,
'elements': elements,
},
def load_url(self, creds_key, value):
if value:
return value
else:
webhook_creds = settings().creds[creds_key]
return webhook_creds['url']
def __init__(self, conf, value, **kwargs):
self.mailgun_creds = settings().creds.get('mailgun', {})
self.mailgun_creds.update(value or {})
domain = self.mailgun_creds['domain']
self.context = {
'subject': 'Kibitzr update for ' + conf['name'],
'from': 'Kibitzr '.format(domain),
'to': [self.mailgun_creds['to']],
}
super(MailgunNotify, self).__init__(conf=conf, value=value, **kwargs)
def run(self, once=False, log_level=logging.INFO, names=None):
# Reset global state for testability:
self.signals.update({
'reload_conf_pending': False,
'interrupted': False,
'open_backdoor': False,
})
self.setup_logger(log_level)
self.connect_signals()
try:
while True:
if self.signals['interrupted']:
return 1
if self.signals['reload_conf_pending']:
settings().reread()
self.signals['reload_conf_pending'] = False
checkers = Checker.create_from_settings(
checks=settings().checks,
names=names
)
if checkers:
self.before_start(checkers)
self.execute_all(checkers)
if once:
return 0
else:
self.check_forever(checkers)
else:
logger.warning("No checks defined. Exiting")
return 1
finally:
def fetch_by_python(code, conf):
logger.info("Fetch using Python script")
logger.debug(code)
assert 'content' in code, PYTHON_ERROR
try:
# ok, content = False, None
namespace = {'ok': True}
context = {
'conf': conf,
'stash': LazyStash(),
'creds': settings().creds,
}
exec(code, context, namespace)
return namespace['ok'], namespace['content']
except:
logger.exception("Python fetcher raised an Exception")
return False, traceback.format_exc()
def __init__(self, conf, value):
self.code = value
self.context = {
'conf': conf,
'creds': settings().creds,
}
def __init__(self, url=None, *args, **kwargs):
zapier_conf = settings().notifiers.get('zapier', {})
zapier_conf.update(settings().creds.get('zapier', {}))
if url is not None:
zapier_conf.update({'url': url})
self.url = zapier_conf['url']
self.session = requests.Session()
def post_gitter(conf, report, **kwargs):
gitter = settings().notifiers.get('gitter', {})
gitter.update(settings().creds.get('gitter', {}))
response = requests.post(
gitter['url'],
data={"message": report},
)
logger.debug(response.text)
response.raise_for_status()
return response