Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
mock_send.return_value = response
mock_send.side_effect = None
# check that we're as expected
assert obj.notify(
title='test', body='body',
notify_type=NotifyType.INFO) == response
else:
for exception in test_exceptions:
mock_send.side_effect = exception
mock_send.return_value = None
try:
assert obj.notify(
title='test', body='body',
notify_type=NotifyType.INFO) is False
except AssertionError:
# Don't mess with these entries
raise
except Exception:
# We can't handle this exception type
raise
except AssertionError:
# Don't mess with these entries
print('%s AssertionError' % url)
raise
except Exception as e:
# Check that we were expecting this exception to happen
notify_type=apprise.NotifyType.INFO) is True
# Restore the variable for remaining tests
setattr(ssl, 'PROTOCOL_TLS', ssl_temp_swap)
else:
# Handle case where it is not missing
setattr(ssl, 'PROTOCOL_TLS', ssl.PROTOCOL_TLSv1)
# Test our URL
url = 'xmpps://user:pass@example.com'
obj = apprise.Apprise.instantiate(url, suppress_exceptions=False)
# Test we loaded
assert isinstance(obj, apprise.plugins.NotifyXMPP) is True
assert obj.notify(
title='title', body='body',
notify_type=apprise.NotifyType.INFO) is True
# Restore settings as they were
del ssl.PROTOCOL_TLS
urls = (
{
'u': 'xmpps://user:pass@example.com',
'p': 'xmpps://user:****@example.com',
}, {
'u': 'xmpps://user:pass@example.com?'
'xep=30,199,garbage,xep_99999999',
'p': 'xmpps://user:****@example.com',
}, {
'u': 'xmpps://user:pass@example.com?xep=ignored',
'p': 'xmpps://user:****@example.com',
}, {
reload(sys.modules['apprise.Apprise'])
reload(sys.modules['apprise'])
# Create our instance
obj = apprise.Apprise.instantiate('gnome://', suppress_exceptions=False)
obj.duration = 0
# Check that it found our mocked environments
assert obj._enabled is True
# Test url() call
assert isinstance(obj.url(), six.string_types) is True
# test notifications
assert obj.notify(title='title', body='body',
notify_type=apprise.NotifyType.INFO) is True
# test notification without a title
assert obj.notify(title='', body='body',
notify_type=apprise.NotifyType.INFO) is True
obj = apprise.Apprise.instantiate(
'gnome://_/?image=True', suppress_exceptions=False)
assert isinstance(obj, apprise.plugins.NotifyGnome) is True
assert obj.notify(title='title', body='body',
notify_type=apprise.NotifyType.INFO) is True
obj = apprise.Apprise.instantiate(
'gnome://_/?image=False', suppress_exceptions=False)
assert isinstance(obj, apprise.plugins.NotifyGnome) is True
assert obj.notify(title='title', body='body',
notify_type=apprise.NotifyType.INFO) is True
# Test that our default settings over-ride base settings since they are
# not the same as the one specified in the base; this check merely
# ensures our plugin inheritance is working properly
assert obj.body_maxlen == plugins.NotifyTelegram.body_maxlen
# We don't override the title maxlen so we should be set to the same
# as our parent class in this case
assert obj.title_maxlen == plugins.NotifyBase.title_maxlen
# This tests erroneous messages involving multiple chat ids
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is False
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is False
assert nimg_obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is False
# This tests erroneous messages involving a single chat id
obj = plugins.NotifyTelegram(bot_token=bot_token, targets='l2g')
nimg_obj = plugins.NotifyTelegram(bot_token=bot_token, targets='l2g')
nimg_obj.asset = AppriseAsset(image_path_mask=False, image_url_mask=False)
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is False
assert nimg_obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is False
# Bot Token Detection
# Just to make it clear to people reading this code and trying to learn
# what is going on. Apprise tries to detect the bot owner if you don't
# specify a user to message. The idea is to just default to messaging
# the bot owner himself (it makes it easier for people). So we're testing
"# Heading 2 ##\n\nTest\n\n" + \
"more content\n" + \
"even more content \t\r\n\n\n" + \
"# Heading 3 ##\n\n\n" + \
"normal content\n" + \
"# heading 4\n" + \
"#### Heading 5"
results = obj.extract_markdown_sections(test_markdown)
assert isinstance(results, list) is True
# We should have 5 sections (since there are 5 headers identified above)
assert len(results) == 5
# Use our test markdown string during a notification
assert obj.notify(
body=test_markdown, title='title', notify_type=NotifyType.INFO) is True
# Create an apprise instance
a = Apprise()
# Our processing is slightly different when we aren't using markdown
# as we do not pre-parse content during our notifications
assert a.add(
'discord://{webhook_id}/{webhook_token}/'
'?format=markdown&footer=Yes'.format(
webhook_id=webhook_id,
webhook_token=webhook_token)) is True
# This call includes an image with it's payload:
assert a.notify(body=test_markdown, title='title',
notify_type=NotifyType.INFO,
body_format=NotifyFormat.TEXT) is True
title="my title", body="my body", notify_type='bad') is True
# No Title/Body combo's
assert a.notify(title=None, body=None) is False
assert a.notify(title='', body=None) is False
assert a.notify(title=None, body='') is False
# As long as one is present, we're good
assert a.notify(title=None, body='present') is True
assert a.notify(title='present', body=None) is True
assert a.notify(title="present", body="present") is True
# Send Attachment with success
attach = join(TEST_VAR_DIR, 'apprise-test.gif')
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=attach) is True
# Send the attachment as an AppriseAttachment object
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach=AppriseAttachment(attach)) is True
# test a invalid attachment
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
attach='invalid://') is False
# Repeat the same tests above...
# however do it by directly accessing the object; this grants the similar
# results:
assert a[0].notify(
# Our path doesn't exist anymore using this logic
assert a.image_path(
NotifyType.INFO,
NotifyImageSize.XY_256,
must_exist=True) is None
# Return our permission so we don't have any problems with our cleanup
chmod(dirname(sub.strpath), 0o700)
# Our content is retrivable again
assert a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is not None
# our file path is accessible again too
assert a.image_path(
NotifyType.INFO,
NotifyImageSize.XY_256,
must_exist=True) is not None
# We do the same test, but set the permission on the file
chmod(a.image_path(NotifyType.INFO, NotifyImageSize.XY_256), 0o000)
# our path will still exist in this case
assert a.image_path(
NotifyType.INFO,
NotifyImageSize.XY_256,
must_exist=True) is not None
# but we will not be able to open it
assert a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None
# Restore our permissions
@click.option('--notification-type', '-n', default=NotifyType.INFO, type=str,
metavar='TYPE',
help='Specify the message type (default=info). Possible values'
' are "{}", and "{}".'.format(
'", "'.join(NOTIFY_TYPES[:-1]), NOTIFY_TYPES[-1]))
@click.option('--theme', '-T', default='default', type=str, metavar='THEME',
help='Specify the default theme.')
@click.option('--tag', '-g', default=None, type=str, multiple=True,
metavar='TAG', help='Specify one or more tags to filter '
'which services to notify. Use multiple --tag (-g) entries to '
'"OR" the tags together and comma separated to "AND" them. '
'If no tags are specified then all services are notified.')
@click.option('--dry-run', '-d', is_flag=True,
help='Perform a trial run but only prints the notification '
'services to-be triggered to stdout. Notifications are never '
'sent using this mode.')
@click.option('--verbose', '-v', count=True)
def monitor_and_notify(docker_client, apobj):
logger.info("Starting monitor")
has_send_error_alert = False
while True:
(status, err_msg) = monitor_swarm(docker_client, white_pattern_list, black_list)
logger.debug(" Ouput of monitor: " + status + " " + err_msg)
if msg_prefix != "":
err_msg = "%s\n%s" % (msg_prefix, err_msg)
if (status == NotifyType.INFO and has_send_error_alert) or not has_send_error_alert:
logger.debug("Sending notification:" + err_msg)
apobj.notify(body=err_msg,
title='SwarmAlert',
notify_type=status )
has_send_error_alert = not has_send_error_alert
time.sleep(check_interval)
services = docker_client.services.list()
if len(white_list) > 0:
services = [s for s in services if s.name in white_list and s.name not in black_list]
else:
services = [s for s in services if s.name not in black_list]
services_name = [service.name for service in services]
logger.debug(str(services_name))
not_running_services = [service for service in services if(len(service.tasks({'desired-state':'Running'})) == 0)]
logger.debug("Not running:" + str([service.name for service in not_running_services]))
err_msg = ""
if len(not_running_services) != 0:
err_msg = "Detected Stopped Services: \n%s\n%s" % (service_list_to_str(not_running_services), err_msg)
if err_msg == "":
return NotifyType.INFO, "No stopped services"
else:
return NotifyType.FAILURE, err_msg