Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _kill_child(pid, sign):
cmd = 'ps -ef|grep %s|grep -v grep|awk \'{print $2,$3}\'' % (pid)
ret = cup.shell.ShellExec().run(cmd, 10)
pids = ret['stdout'].strip().split('\n')
for proc in pids:
p_id = proc.split()
if p_id[1] == pid:
_kill_child(p_id[0], sign)
if p_id[0] == pid:
if len(sign) == 0:
cup.shell.execshell('kill %s' % pid)
elif sign == '9' or sign == '-9':
cup.shell.execshell('kill -9 %s' % pid)
elif sign == 'SIGSTOP' or sign == '19' or sign == '-19':
cup.shell.execshell('kill -19 %s' % pid)
elif sign == 'SIGCONT' or sign == '18' or sign == '-18':
cup.shell.execshell('kill -18 %s' % pid)
else:
cup.log.error('sign error')
exec working directory. Plz use
:param tostr:
recipt list, separated by ,
:param subject:
subject
:param body:
email content
:param attach:
email attachment
:param content_is_html:
is htm mode opened
:return:
return True on success, False otherwise
"""
decorators.needlinux(mutt_sendmail)
shellobj = shell.ShellExec()
temp_cwd = os.getcwd()
str_att = ''
cmdstr = ''
if attach == '':
if content_is_html is True:
cmdstr = 'echo "%s"|/usr/bin/mutt -e "my_hdr Content-Type:'\
'text/html" -s "%s" %s' \
% (body, subject, tostr)
else:
cmdstr = 'echo "%s"|/usr/bin/mutt -s "%s" %s' % (
body, subject, tostr
)
else:
attlist = attach.strip().split(',')
attlen = len(attlist)
:return:
return None if not found. Otherwise, return the pid
"""
cmd = (
'ps -ef|grep \'%s\'|grep -v grep|grep -vwE "vim |less |vi |tail |cat |more "'
'|awk \'{print $2}\''
) % (grep_string)
ret = cup.shell.ShellExec().run(cmd, 10)
pids = ret['stdout'].strip().split('\n')
if len(pids) == 0 or len(pids) == 1 and len(pids[0]) == 0:
return None
for pid in pids:
for sel_path in ["cwd", "exe"]:
cmd = 'ls -l /proc/%s/%s|awk \'{print $11}\' ' % (pid, sel_path)
ret = cup.shell.ShellExec().run(cmd, 10)
pid_path = ret['stdout'].strip().strip()
if pid_path.find(process_path) == 0:
return pid
return None
@decorators.needlinux
def __is_port_used(port):
"""internal func"""
cmd = "netstat -nl | grep ':%s '" % (port)
ret = cup.shell.ShellExec().run(cmd, 10)
if 0 != ret['returncode']:
return False
stdout = ret['stdout'].strip()
if 0 == len(stdout):
return False
else:
return True
return __is_port_used(port)
def updatekv(self, confpath, key, val):
"""
update key with value
"""
cmd = "%s -c %s -u %s:%s " % (self._modtool, confpath, key, val)
try_times = 0
while True:
ret = cup.shell.ShellExec().run(cmd, 120)
if(
ret['returncode'] == 0
or not ret['returncode']
or try_times > 1
):
ret['stdout'] = ret['stdout'].decode('gbk')
ret['stdout'] = ret['stdout'].encode('utf-8')
# print ret['stdout']
break
else:
try_times += 1
print('err:updatekv')
time.sleep(1)
def addkv(self, confpath, key, val):
"""
add key value into a conf
"""
cmd = "%s -c %s -i %s:%s &>/dev/null" % (
self._modtool, confpath, key, val
)
try_times = 0
while True:
ret = cup.shell.ShellExec().run(cmd, 120)
if(
ret['returncode'] == 0
or not ret['returncode']
or try_times > 1
):
ret['stdout'] = ret['stdout'].decode('gbk')
ret['stdout'] = ret['stdout'].encode('utf-8')
print(ret['stdout'])
break
else:
try_times += 1
print('err:addkv')
time.sleep(1)
if(ret == 0 or try_times > 1):
print(cmd)
def delkv(self, confpath, key):
"""
del a key from a conf file
"""
cmd = "%s -c %s -d %s " % (self._modtool, confpath, key)
try_times = 0
while True:
ret = cup.shell.ShellExec().run(cmd, 120)
if(
ret['returncode'] == 0
or not ret['returncode']
or try_times > 1
):
ret['stdout'] = ret['stdout'].decode('gbk')
ret['stdout'] = ret['stdout'].encode('utf-8')
print(ret['stdout'])
break
else:
try_times += 1
print('err:delkv')
time.sleep(1)
if(ret == 0 or try_times > 1):
print(cmd)