Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sys_per_cpu_times(self):
for times in psutil.cpu_times(percpu=True):
total = 0
sum(times)
for cp_time in times:
self.assertIsInstance(cp_time, float)
self.assertGreaterEqual(cp_time, 0.0)
total += cp_time
self.assertEqual(total, sum(times))
str(times)
self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
len(psutil.cpu_times(percpu=False)))
def test_sys_per_cpu_times_percent_negative(self):
# see: https://github.com/giampaolo/psutil/issues/645
psutil.cpu_times_percent(percpu=True)
zero_times = [x._make([0 for x in range(len(x._fields))])
for x in psutil.cpu_times(percpu=True)]
with mock.patch('psutil.cpu_times', return_value=zero_times):
for cpu in psutil.cpu_times_percent(percpu=True):
for percent in cpu:
self._test_cpu_percent(percent, None, None)
def __update__(self, input_stats):
"""
Update the stats
"""
# CPU
cputime = psutil.cpu_times()
cputime_total = (cputime.user +
cputime.system +
cputime.idle)
# Only available on some OS
if hasattr(cputime, 'nice'):
cputime_total += cputime.nice
if hasattr(cputime, 'iowait'):
cputime_total += cputime.iowait
if hasattr(cputime, 'irq'):
cputime_total += cputime.irq
if hasattr(cputime, 'softirq'):
cputime_total += cputime.softirq
if not hasattr(self, 'cputime_old'):
self.cputime_old = cputime
self.cputime_total_old = cputime_total
self.cpu = {}
def get_total_time(pid):
proc = get_process(pid)
if not proc:
return 0
times = proc.cpu_times() if pid != -1 else psutil.cpu_times()
return times.user + times.system + getattr(times, "children_user", 0) + getattr(times, "children_system", 0)
def cpu_stats(self):
"""
Returns CPU times per (logical) CPU.
"""
res = {}
i = 0
for c in psutil.cpu_times(percpu=True):
res[i] = {
'user': c.user,
'system': c.system,
'idle': c.idle
}
i += 1
return res
updateInterval=float(self.update) #timer interval in seconds
totcpupct=0
for proc in psutil.process_iter():
try:
psutil.Process(proc.pid).is_running()
#if proc.pid == 37196:
#check if process still exists, if so, update dictionaries
try:
#check if process previously existed - if so we can calculate a cpupct
proctimeHold=self.ui.d_procTimes[proc.pid]
proctime=proc.get_cpu_times() #get
deltaProcTime=(proctime.user+proctime.system) - (proctimeHold.user+proctimeHold.system)
cputimeHold=self.ui.d_cpuTimes[proc.pid]
cputime=psutil.cpu_times()
deltaCPUTime=(cputime.user+cputime.system+cputime.idle) - (cputimeHold.user+cputimeHold.system+cputimeHold.idle)
if deltaProcTime > 0:
if deltaCPUTime < updateInterval:
deltaCPUTime=updateInterval
else:
pass
cpupct=float(deltaProcTime)/float(deltaCPUTime)*100.0
else:
cpupct=0
if cpupct < 0:
cpupct=0
cpupct=float(int(float(cpupct)*100))/100 #only keep two decimal places
def check(self, instance):
instance_tags = instance.get('tags', [])
cpu_times = psutil.cpu_times(percpu=True)
self.gauge('system.core.count', len(cpu_times), tags=instance_tags)
for i, cpu in enumerate(cpu_times):
tags = instance_tags + ['core:{0}'.format(i)]
for key, value in iteritems(cpu._asdict()):
self.rate('system.core.{0}'.format(key), 100.0 * value, tags=tags)
def get_performance_metrics(cls, application_uptime_s, queue_sizes={}):
"""Get sensor hardware and os performance statistics."""
retval = {}
retval["queue_sizes"] = queue_sizes
cpu_times = psutil.cpu_times()
retval["scan_program"] = "health_check"
retval["timestamp"] = Utility.get_now_string()
retval["cpu_percent"] = psutil.cpu_percent(percpu=True)
retval["cpu_times"] = {"user": cpu_times.user,
"system": cpu_times.system,
"idle": cpu_times.idle,
"iowait": cpu_times.iowait}
retval["mem"] = {"free": psutil.virtual_memory().free,
"swap_percent_used": psutil.swap_memory().percent}
retval["root_vol"] = psutil.disk_usage('/').percent
retval["data_vol"] = psutil.disk_usage('/data/').percent
retval["application_uptime_seconds"] = application_uptime_s
return retval
def _system_status(self):
cpu = psutil.cpu_times()
mem = psutil.virtual_memory()
swap = psutil.swap_memory()
def partition_gen():
for partition in psutil.disk_partitions():
usage = psutil.disk_usage(partition.mountpoint)
yield DiskPartition(
device=partition.device,
mount_point=partition.mountpoint,
total=usage.total,
used=usage.used,
free=usage.free,
)
return SystemStatus(
time=int(time.time()),