Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def probe(self):
codec = munge.get_codec('yaml')()
msg = {}
msg['data'] = []
msg['ts'] = (datetime.datetime.utcnow() - datetime.datetime(1970, 1, 1)).total_seconds()
# FIXME use poll
for host in self.hosts:
args = shlex.split(self.command.format(host=host))
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# TODO poll, timeout, maybe from parent process for better control?
with proc.stdout:
#msg['data'].append(proc.stdout.read())
msg['data'].append(codec.load(proc.stdout))
return msg
port = 33434
# -f first_ttl
# -m max_ttl
args = [
command,
'-n',
# -w wait time seconds
'-w1',
# -q number of queries
'-q1',
self.mtr_host,
]
# get both stdout and stderr
proc = self.popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
with proc.stdout:
hosts = self.parse_traceroute(iter(proc.stdout.readline, b''))
return hosts
def _run_proc(self):
args = [
self.config['command'],
'-u',
'-C%d' % self.count,
'-p%d' % self.period,
'-e'
]
args.extend(self.hosts_args())
data = list()
# get both stdout and stderr
proc = self.popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# TODO poll, timeout, maybe from parent process for better control?
with proc.stdout:
for line in iter(proc.stdout.readline, b''):
data.append(self.parse_verbose(line.decode("utf-8")))
return data
def popen(self, *args):
""" make cmd command and popen it """
cmd = ['sflowtool',]
cmd.extend(args)
# cmd.extend(self.hosts)
self.log.debug(' '.join(cmd))
# get both stdout and stderr
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def poll_process(self, cmd, stdout, stderr):
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while True:
try:
fds = [proc.stdout.fileno(), proc.stderr.fileno()]
ret = select.select(fds, [], [])
for fd in ret[0]:
if fd == proc.stdout.fileno():
read = proc.stdout.readline()
#stdout.append(read)
if fd == proc.stderr.fileno():
read = proc.stderr.readline()
self.log.error("stderr" + read)
if proc.poll() != None:
self.log.warning("poll break")
break