Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_net_through(str_interface):
"""
get network interface statistics by a interface (eth0, e,g,)
"""
decorators.needlinux(True)
rx_bytes = tx_bytes = -1
fp = open('/proc/net/dev', 'r')
try:
for line in fp:
if str_interface in line:
data = line.split('%s:' % str_interface)[1].split()
rx_bytes, tx_bytes = (data[0], data[8])
finally:
fp.close()
if rx_bytes < 0 or tx_bytes < 0:
msg = 'Failed to parse /proc/net/dev'
warnings.warn(msg, RuntimeWarning)
cup.unittest.assert_ge(rx_bytes, 0)
cup.unittest.assert_ge(tx_bytes, 0)
return (int(rx_bytes), int(tx_bytes))
def get_boottime_since_epoch():
"""
:return:
return boot time (seconds) since epoch
"""
decorators.needlinux(True)
fp = open('/proc/stat', 'r')
try:
for line in fp:
if line.startswith('btime'):
return float(line.strip().split()[1])
raise RuntimeError("line 'btime' not found")
finally:
fp.close()
def get_meminfo():
"""
get system memory info
"""
decorators.needlinux(True)
total = free = buffers = cached = active = inactive = None
fp = open('/proc/meminfo', 'r')
try:
for line in fp:
if line.startswith('MemTotal'):
total = int(line.split()[1]) * 1024
elif line.startswith('MemFree'):
free = int(line.split()[1]) * 1024
elif line.startswith('Buffers'):
buffers = int(line.split()[1]) * 1024
elif line.startswith('Cached:'):
cached = int(line.split()[1]) * 1024
elif line.startswith('Active:'):
active = int(line.split()[1]) * 1024
elif line.startswith('Inactive:'):
inactive = int(line.split()[1]) * 1024
def pids():
"""Returns a list of PIDs currently running on the system."""
decorators.needlinux(True)
return [int(x) for x in os.listdir(b'/proc') if x.isdigit()]
'lo':
(
235805206817, 235805206817, 315060887, 315060887, 0, 0, 0, 0
),
'eth1':
(
18508976300272, 8079464483699, 32776530804,
32719666038, 0, 0, 708015, 0
),
'eth0':
(
0, 0, 0, 0, 0, 0, 0, 0
)
}
"""
decorators.needlinux(True)
fhandle = open("/proc/net/dev", "r")
try:
lines = fhandle.readlines()
finally:
fhandle.close()
retdict = {}
for line in lines[2:]:
colon = line.rfind(':')
assert colon > 0, repr(line)
name = line[:colon].strip()
fields = line[colon + 1:].strip().split()
bytes_recv = int(fields[0])
packets_recv = int(fields[1])
errin = int(fields[2])
dropin = int(fields[3])
bytes_sent = int(fields[8])
case the cached instance is updated.
yuetian:
1. the origion use a check_running function to check whether
PID has been reused by another process in which case yield a new
Process instance
hint:i did not use check_running function because the container of
pid is set
2. the origion use a sorted list(_pmap.items()) +
list(dict.fromkeys(new_pids).items()
to get pid and proc to make res.proc is only a instance of a pid Process
hint(bugs):i did not use fromkeys(new_pids) because i did not get the meanning
of using proc
"""
decorators.needlinux(True)
pid_set = set(pids())
for pid in pid_set:
try:
check_process = Process(pid)
res = check_process.get_process_name()
except cup.err.NoSuchProcess:
pass
else:
yield check_process
@decorators.needlinux
def _real_rmrf(fpath, safemode):
"""
real rmrf
"""
if safemode:
if os.path.normpath(os.path.abspath(fpath)) == '/':
raise err.ShellException('cannot rmtree root / under safemode')
if os.path.isfile(fpath):
os.unlink(fpath)
else:
shutil.rmtree(fpath)
return _real_rmrf(fpath, safemode)