Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_halfway_terminated_process(self):
# Test that NoSuchProcess exception gets raised in case the
# process dies after we create the Process object.
# Example:
# >>> proc = Process(1234)
# >>> time.sleep(2) # time-consuming task, process dies in meantime
# >>> proc.name()
# Refers to Issue #15
sproc = get_test_subprocess()
p = psutil.Process(sproc.pid)
p.terminate()
p.wait()
if WINDOWS:
call_until(psutil.pids, "%s not in ret" % p.pid)
self.assertFalse(p.is_running())
# self.assertFalse(p.pid in psutil.pids(), msg="retcode = %s" %
# retcode)
excluded_names = ['pid', 'is_running', 'wait', 'create_time']
if LINUX and not RLIMIT_SUPPORT:
excluded_names.append('rlimit')
for name in dir(p):
if (name.startswith('_') or
name in excluded_names):
continue
try:
meth = getattr(p, name)
# get/set methods
if name == 'nice':
if POSIX:
def stop_modbus():
'''Stop all virtual modbus devices'''
for pid in psutil.pids():
proc = psutil.Process(pid)
for opt in proc.cmdline():
if 'modbus' in opt:
print('Killing: ', opt)
os.kill(pid, signal.SIGTERM)
break
error_and_exit("Can't have no command but have command args",
"usage with command is: "
"howlong [options] -c your_command your_options")
self.log_file = parsed_args.f[0] if parsed_args.f else None
if self.log_file:
logging.basicConfig(filename=self.log_file, level=self.log_level,
format='%(levelname)s:%(message)s')
else:
logging.basicConfig(level=self.log_level,
format='%(levelname)s:%(message)s')
if parsed_args.p:
self.pid = int(parsed_args.p[0])
self.command = None
if self.pid not in psutil.pids():
error_and_exit("argument p must be a valid pid, "
"%d is not one" % self.pid)
else:
if parsed_args.m:
try:
with open('history.txt', 'r') as f:
history = f.readlines()
for line in history:
logging.info(line.strip())
except:
logging.warning("No history avaialable\n")
exit()
elif parsed_args.mc:
if os.path.isfile('history.txt'):
def collect_volatile(self):
ps_list = list()
open_files = list()
for p in psutil.pids():
try:
ps_list.append(psutil.Process(p).as_dict())
for f in psutil.Process(p).open_files():
tmp = f._asdict()
tmp['pid'] = p
open_files.append(json.dumps(tmp))
except psutil._exceptions.AccessDenied as e:
self.logger.error("Psutil error: {}".format(e))
except psutil._exceptions.ZombieProcess as e:
self.logger.error("Psutil error: {}".format(e))
except OSError as e:
self.logger.error("Psutil error: {}".format(e))
self.output_volatile('ps_list', ps_list)
self.output_volatile('open_files', open_files)
try:
def _enumerate_processes():
for pid in psutil.pids():
yield (pid, psutil.Process(pid).name())
start = datetime.datetime.strptime(metadata['startedDatetime'], "%Y-%m-%dT%H:%M:%S.%f")
end = None if metadata['endedDatetime'] is None else datetime.datetime.strptime(metadata['endedDatetime'], "%Y-%m-%dT%H:%M:%S.%f")
if end is None:
duration = datetime.datetime.now() - start
else:
duration = end - start
duration = utils.floor_timedelta(duration)
status = metadata['status']
tags = sorted(metadata['tags'])
file_space = utils.get_file_space_representation(str(path/c.FILES_FOLDER))
experiment_pid = int(metadata['pid'])
pids = psutil.pids()
pid_icon_name = 'fas fa-play text-success' if experiment_pid in pids else 'fas fa-stop text-danger'
# Set lightbulb class:
if metadata['status'] == 'running':
lightbulb_class = 'text-{}'.format('primary' if metadata['conclusion'] else 'secondary')
else:
lightbulb_class = 'text-{}'.format('primary' if metadata['conclusion'] else 'danger')
infoicon_class = 'text-primary' if metadata['description'] else 'text-secondary'
arguments = utils.arguments_to_string(metadata['arguments'])
procedure_item_by_column = {
'select-row': '',
'DetailsControl': '',
'UUID': uuid,
def update(self):
self.procs = len(psutil.pids())
def running(self):
import psutil
pid = self.pid
return pid and pid in psutil.pids()
def stopCoordinator(self):
coordProc = []
try:
import psutil
except Exception as e:
QMessageBox.warning(self.messageParent, "PandoraCoordinator", "Failed to check if the coordinator is running:\n\n%s" % str(e))
return
for x in psutil.pids():
try:
if os.path.basename(psutil.Process(x).exe()) == "PandoraCoordinator.exe":
coordProc.append(x)
except:
pass
if len(coordProc) > 0:
cData = {}
cData["localMode"] = ["globals", "localMode"]
cData["rootPath"] = ["globals", "rootPath"]
cData["slavePath"] = ["coordinator", "rootPath"]
cData = self.getConfig(data=cData)
if cData["localMode"] == True:
coordRoot = cData["rootPath"]
else: