Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init():
with pytest.raises(ValueError):
vaping.plugins.zeromq.ZeroMQ({}, None)
config = {
'bind': 'tcp://*:5555',
'connect': 'tcp://*:5555',
}
with pytest.raises(ValueError):
vaping.plugins.zeromq.ZeroMQ(config, None)
vaping.plugins.zeromq.ZeroMQ({'bind': 'tcp://*:5555'}, None)
'output': [
'emit_store_1',
'emit_store_2'
]
}
],
}
anon_config = {
'type': 'plugin0',
'var0': 999,
}
@plugin.register('plugin0')
class Plugin0(plugins.PluginBase):
pass
@plugin.register('emit0')
class EmitPlugin0(plugins.EmitBase):
def emit(self, message):
pass
@plugin.register('emit_abc')
class EmitPluginABC(plugins.EmitBase):
# emit not defined to test TypeError
pass
@plugin.register('emit_store')
class EmitPluginStore(plugins.EmitBase):
import time
import vaping.plugins
from vaping import plugin
config = {
"type" : "test_tsdb",
"filename" : "{a}-{b}-{field}",
"field" : "test"
}
@plugin.register("test_tsdb")
class TSDBTestPlugin(vaping.plugins.TimeSeriesDB):
"""
Test plugin from the TimeSeriesDB abstraction
"""
def __init__(self, config, ctx):
super(TSDBTestPlugin, self).__init__(config, ctx)
self.updated = {}
def create(self, filename):
self.created = True
def update(self, filename, time, value):
self.updated[filename] = (time, value)
def test_filename_format():
def test_parse_verbose():
fping = vaping.plugins.fping.FPing({'interval': '5s'}, None)
for line, expected in list(expect_verbose.items()):
res = fping.parse_verbose(line)
if expected:
for k, v in list(expected.items()):
assert res
if isinstance(v, float):
assert abs(v - res[k]) < 0.002
else:
assert v == res[k]
else:
assert not res
if line.startswith('CNTR'):
keys = (x[0] for x in _KEYDEF['CNTR'])
typs = (x[1] for x in _KEYDEF['CNTR'])
return {k: t(d) for (d, k, t) in zip(line.split(','), keys, typs)}
def calc_rate(last, cur):
now = datetime.datetime.utcnow()
time_delta = (now - last['ts']).total_seconds()
in_delta = cur['in_oct'] - last['data']['in_oct']
in_bps = in_delta * 8 / time_delta
print("time_delta=%f in_delta=%d Gbps=%f" % (time_delta, in_delta, old_div(in_bps, 1000000000)))
@vaping.plugin.register('sflowtool')
class SflowTool(vaping.plugins.TimedProbe):
default_config={
'interval': '1m',
'count': 5,
}
def init(self):
self.stdout_queue = Queue()
self.stderr_queue = Queue()
args = [
'sflowtool',
'-l',
]
self.spawn_process(args)
def poll_process(self, cmd, stdout, stderr):
from __future__ import absolute_import
import vaping
import vaping.config
try:
import whisper
except ImportError:
whisper = None
@vaping.plugin.register('whisper')
class WhisperPlugin(vaping.plugins.TimeSeriesDB):
"""
Whisper plugin that allows vaping to persist data
in a whisper database
"""
def __init__(self, config, ctx):
if not whisper:
raise ImportError("whisper not found")
super(WhisperPlugin, self).__init__(config, ctx)
# whisper specific config
self.retention = self.config.get("retention", ['3s:1d'])
self.x_files_factor = float(self.config.get("x_files_factor", 0.5))
self.aggregation_method = self.config.get("aggregation_method", "average")
self.sparse = bool(self.config.get("sparse", False))
from __future__ import absolute_import
import logging
import re
import datetime
import vaping
import vaping.config
@vaping.plugin.register("logparse")
class LogParse(vaping.plugins.FileProbe):
"""
Log parse plugin base
Will parse a log line by line and probe to emit data
over a specified interval.
# Config
- path (`str`): log file path
- fields (`dict`): field definition
field name as key
`parser` regex pattern to parse field value, needs to
one group in it
from __future__ import absolute_import
import collections
import datetime
import munge
import shlex
import vaping
from vaping.io import subprocess
@vaping.plugin.register('command')
class CommandProbe(vaping.plugins.TimedProbe):
"""
Probe type plugin that allows you to run an arbitrary
command for each host and return the command output as
data
# Config
- command (`str`): command to run (use `{host}` to reference the host)
- interval (`float`): time between probes
# Instanced Attributes
- command (`str`): command to run
"""
from __future__ import absolute_import, division, print_function
import collections
import logging
import vaping
from vaping.io import subprocess
from vaping.util import which
class FPingBase(vaping.plugins.TimedProbe):
"""
FPing base plugin
config:
# Config
- command (`str=fping`): command to run
- interval (`str=1m`): time between pings
- count (`int=5`): number of pings to send
- period (`int=20`): time in milliseconds that fping waits
between successive packets to an individual target
# Instanced Attributes
- count (`int`): number of fpings to send