Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
zs = ZabbixSender()
self.assertEqual(zs.__class__.__name__, 'ZabbixSender')
self.assertEqual(isinstance(zs.zabbix_uri[0], tuple), True)
self.assertEqual(zs.zabbix_uri[0][0], '127.0.0.1')
self.assertEqual(zs.zabbix_uri[0][1], 10051)
def test_create_messages(self):
m = [ZabbixMetric('host1', 'key1', 1),
ZabbixMetric('host2', 'key2', 2)]
zs = ZabbixSender()
result = zs._create_messages(m)
self.assertIsInstance(result, list)
self.assertEqual(len(result), 2)
def test_load_from_config(self):
folder = os.path.dirname(__file__)
filename = os.path.join(folder, 'data/zabbix_agentd.conf')
zs = ZabbixSender()
result = zs._load_from_config(config_file=filename)
self.assertEqual(result, [('192.168.1.2', 10051)])
def test_get_response(self, mock_socket):
mock_socket.recv.side_effect = (self.resp_header, self.resp_body)
zs = ZabbixSender()
result = zs._get_response(mock_socket)
mock_socket.recv.assert_has_calls([call(92)])
self.assertEqual(result['response'], 'success')
def test_init_config_exception(self):
folder = os.path.dirname(__file__)
filename = os.path.join(folder, 'zabbix_agent.conf')
with self.assertRaises(Exception):
ZabbixSender(use_config=filename)
def test_sendMetricsToServer(self):
cur_date_unix = int(now())
m = [
ZabbixMetric('host2', 'key3', 'IDDQD'),
ZabbixMetric('host1', 'key1', 33.1, cur_date_unix)
]
z = ZabbixSender('127.0.0.1', 10051).send(m)
self.assertEqual(z, True)
arg = "-u " + muser + " -p " + mpass + " -h " + mongohost + ":" + mongoport + " --authenticationDatabase=admin --rowcount 1 --noheaders"
cmd = ['mongostat', "-u", muser, "-p", mpass, "-h", mongohost + ":" + mongoport, "--authenticationDatabase=admin", "--rowcount", "1", "--noheaders"]
r = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = r.communicate()
res = out + err
#print("DEBUG: STR: " + arg)
#print("DEBUG: RES: " + res)
#print("DEBUG: ERR: " + err + str(len(err)))
state = 0
if len(err) > 0:
packet = [ ZabbixMetric(zbhost, 'mongodb_state', state),
ZabbixMetric(zbhost, 'mongodb_errstr', err) ]
result = ZabbixSender(zabbix_port = ZBPORT, zabbix_server = ZBSERVER).send(packet)
print(err)
sys.exit(1)
res = res.rstrip()
res = res.replace('*','')
res = re.sub("^ +","",res)
arr = re.split(" +", res)
def str_to_int(s):
m = re.match('(\d+)(\[a-z]|[A-Z])', s)
r = re.match('(\d+).(\d+)(\[a-z]|[A-Z])', s)
if r:
m = r
if m:
i = int(m.group(1))
if m.group(2) == 'k':
i = i * 1000
def send_metric(self, hostname, key, data):
zm = ZabbixMetric(hostname, key, data)
if self.send_aggregated_metrics:
self.aggregated_metrics.append(zm)
if len(self.aggregated_metrics) > self.metrics_chunk_size:
self.logger.info("Sending: %s metrics" % (len(self.aggregated_metrics)))
try:
ZabbixSender(zabbix_server=self.sender_host, zabbix_port=self.sender_port).send(self.aggregated_metrics)
self.aggregated_metrics = []
except Exception as e:
self.logger.exception(e)
pass
else:
try:
ZabbixSender(zabbix_server=self.sender_host, zabbix_port=self.sender_port).send(zm)
except Exception as e:
self.logger.exception(e)
pass
def alert(self, matches):
# Matches is a list of match dictionaries.
# It contains more than one match when the alert has
# the aggregation option set
zm = []
for match in matches:
ts_epoch = int(datetime.strptime(match['@timestamp'], "%Y-%m-%dT%H:%M:%S.%fZ").strftime('%s'))
zm.append(ZabbixMetric(host=self.zbx_host, key=self.zbx_key, value=1, clock=ts_epoch))
ZabbixSender(zabbix_server=self.zbx_sender_host, zabbix_port=self.zbx_sender_port).send(zm)
else:
config['sleep_seconds'] = int(config['sleep_seconds'])
logging.basicConfig(
stream=sys.stdout,
level=getattr(logging, config['log_level'].upper()))
# read history of previous runs
errors = []
fs_history = path.abspath(path.join(path.dirname(fs_config), 'watchdog-history.json'))
zbx = None
result_code = None
if 'zabbix_host' in config:
zbx = ZabbixSender(config['zabbix_host'])
while True:
try:
if path.exists(fs_history):
previous_history = json.load(open(fs_history, 'r'))
else:
log.info("Starting with empty history.")
previous_history = dict()
# fetch submissions from mail server
log.debug("Fetching previous submissions from IMAP server")
history = fetch_test_submissions(previous_history=previous_history, config=config)
# check for failed test submissions
max_process_secs = int(config['max_process_secs'])
now = datetime.now()