Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_swap_memory(self):
out = sh('env PATH=/usr/sbin:/sbin:%s swap -l -k' % os.environ['PATH'])
lines = out.strip().split('\n')[1:]
if not lines:
raise ValueError('no swap device(s) configured')
total = free = 0
for line in lines:
line = line.split()
t, f = line[-2:]
t = t.replace('K', '')
f = f.replace('K', '')
total += int(int(t) * 1024)
free += int(int(f) * 1024)
used = total - free
psutil_swap = psutil.swap_memory()
self.assertEqual(psutil_swap.total, total)
self.assertEqual(psutil_swap.used, used)
self.assertEqual(psutil_swap.free, free)
def test_get_swap_swapped_out(self):
swap = self.service.get_swap_space()
self.assertEqual(swap['swapped_out'], psutil.swap_memory().sout)
def test_swapmem_total(self):
out = sh('sysctl vm.swapusage')
out = out.replace('vm.swapusage: ', '')
total, used, free = re.findall('\d+.\d+\w', out)
psutil_smem = psutil.swap_memory()
self.assertEqual(psutil_smem.total, human2bytes(total))
self.assertEqual(psutil_smem.used, human2bytes(used))
self.assertEqual(psutil_smem.free, human2bytes(free))
def memory(part, chart=None):
if part == 'memory':
if chart in ['line', 'column']:
return render_template('memory/memory-%s.html' % chart, part=part, chart=chart)
context = psutil.virtual_memory()
elif not chart and part == 'swap':
context = psutil.swap_memory()
else:
return render_template('404.html'), 404
return render_template('memory/%s.html' % part, context=context, part=part)
def _getvalues(self):
val = {}
swap = psutil.swap_memory()
val['SwapTotal'] = int(swap.total / 1024 / 1024)
val['SwapFree'] = int(swap.free / 1024 / 1024)
return val
def swap(self, event):
"""Swap utilisation
- ``system:swap:percent``
- ``system:swap:absolute``
"""
swap = psutil.swap_memory()
self.riemann.event(service='system:swap:percent', metric_f=swap.percent)
self.riemann.event(service='system:swap:absolute', metric_f=swap.used)
def get_swap():
Mem = psutil.swap_memory()
return int(Mem.total/1024.0), int(Mem.used/1024.0)
streamer.log("sample", x)
# Get total CPU usage
cpu_percent = psutil.cpu_percent()
streamer.log("cpu_total", cpu_percent)
# Get individual CPU usage
cpu_percents = psutil.cpu_percent(percpu=True)
streamer.log_object(cpu_percents, key_prefix="cpu")
# Get the virtual memory usage
memory = psutil.virtual_memory()
streamer.log_object(memory, key_prefix="virtual_mem")
# Get the swap memory usage
swap = psutil.swap_memory()
streamer.log_object(swap, key_prefix="swap_mem")
# Get the network usage
network = psutil.net_io_counters()
streamer.log_object(network, key_prefix="net_io")
# flush the stream to ensure optimal buffer and consumption experience
streamer.flush()
# sleep before sampling again
time.sleep(sample_rate_in_ms/1000)
# cleanup the stream and ensure logs are flushed
streamer.close()
def _update(self):
"""
Protected update method, for subclasses to define
how to populate the profile data on the specific platform.
The class will call this method first, followed by post operations
to clean the data if needed.
"""
memstats = psutil.virtual_memory()
swapstats = psutil.swap_memory()
b_to_mb = 1024 ** 2
self.data.update({
'freeRamMb': memstats.available / b_to_mb,
'totalRamMb': memstats.total / b_to_mb,
'freeSwapMb': swapstats.free / b_to_mb,
'totalSwapMb': swapstats.total / b_to_mb,
})