Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def assert_works_interactively():
read, write = os.openpty()
pty.normalize_newlines(read)
# setsid: this simulates the shell's job-control behavior
proc = Popen(('setsid', 'pgctl', 'debug', 'greeter'), stdin=PIPE, stdout=write)
os.close(write)
try:
assert read_line(read) == 'What is your name?\n'
proc.stdin.write(b'Buck\n')
proc.stdin.flush()
assert read_line(read) == 'Hello, Buck.\n'
finally:
ctrl_c(proc)
proc.wait()
def it_runs_before_debugging_a_service(self):
proc = Popen(('setsid', 'pgctl', 'debug', 'sweet'), stdin=PIPE, stdout=PIPE)
proc.stdin.close()
try:
assert proc.stdout.readline().decode('utf-8') == 'hello, i am a pre-start script in stdout\n'
finally:
ctrl_c(proc)
proc.wait()
def run(cmd, **popen_args):
"""run the command, show the output, and return (stdout, stderr, returncode)"""
from pgctl.subprocess import Popen, PIPE
process = Popen(cmd, stdout=PIPE, stderr=PIPE, **popen_args)
stdout, stderr = process.communicate()
stdout, stderr = stdout.decode('UTF-8'), stderr.decode('UTF-8')
show_both(stdout, stderr)
return stdout, stderr, process.returncode
def it_is_disallowed():
assert_command(
('pgctl', 'start'),
'',
'''\
[pgctl] Starting: sweet
[pgctl] Started: sweet
''',
0,
)
first = Popen(('pgctl', 'restart'), stdout=PIPE, stderr=PIPE)
# slow-shutdown takes two seconds to shut down; aim for the middle:
sleep(1)
second = Popen(('pgctl', 'restart'), stdout=PIPE, stderr=PIPE)
first_stdout, first_stderr = first.communicate()
first_stdout, first_stderr = first_stdout.decode('UTF-8'), first_stderr.decode('UTF-8')
show_both(first_stdout, first_stderr)
assert norm.pgctl(first_stderr) == '''\
[pgctl] Stopping: sweet
[pgctl] ERROR: service 'sweet' failed to stop after {TIME} seconds, its status is ready (pid {PID}) {TIME} seconds
==> playground/sweet/logs/current <==
{TIMESTAMP} sweet
{TIMESTAMP} sweet_error
[pgctl]
assert_command(
('pgctl', 'start'),
'',
'''\
[pgctl] Starting: sweet
[pgctl] Started: sweet
''',
0,
)
first = Popen(('pgctl', 'restart'), stdout=PIPE, stderr=PIPE)
# slow-shutdown takes two seconds to shut down; aim for the middle:
sleep(1)
second = Popen(('pgctl', 'restart'), stdout=PIPE, stderr=PIPE)
first_stdout, first_stderr = first.communicate()
first_stdout, first_stderr = first_stdout.decode('UTF-8'), first_stderr.decode('UTF-8')
show_both(first_stdout, first_stderr)
assert norm.pgctl(first_stderr) == '''\
[pgctl] Stopping: sweet
[pgctl] ERROR: service 'sweet' failed to stop after {TIME} seconds, its status is ready (pid {PID}) {TIME} seconds
==> playground/sweet/logs/current <==
{TIMESTAMP} sweet
{TIMESTAMP} sweet_error
[pgctl]
[pgctl] There might be useful information further up in the log; you can view it by running:
[pgctl] less +G playground/sweet/logs/current
[pgctl] ERROR: Some services failed to stop: sweet
'''
assert first_stdout == ''
def it_disables_polling():
stderr = open('stderr', 'w')
proc = Popen(('pgctl', 'debug', 'unreliable'), stdin=open(os.devnull), stdout=PIPE, stderr=stderr)
def check_file_contents():
expected = '''\
pgctl-poll-ready: disabled during debug -- quitting
'''
with open('stderr') as fd:
actual = fd.read()
return expected == actual
wait_for(check_file_contents)
proc.terminate()
stderr.close()
def svstat_string(service_path):
"""Wrapper for daemontools svstat cmd"""
# svstat *always* exits with code zero...
if not svok(service_path):
return SvStat.UNSUPERVISED
cmd = ('s6-svstat', service_path)
process = Popen(cmd, stdout=PIPE, stderr=STDOUT)
status, _ = process.communicate()
status = status.decode('UTF-8')
#status is listed per line for each argument
return status
def ps(pids):
"""Give a (somewhat) human-readable printout of a list of processes"""
pids = tuple([str(pid) for pid in pids])
if not pids:
return ''
from .subprocess import PIPE, Popen
cmd = ('ps', '--forest', '-wwfj',) + pids
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, _ = process.communicate()
stdout = stdout.decode('UTF-8')
if stdout.count('\n') > 1:
return stdout
else:
# race condition: we only got the ps -f header
return '' # pragma: no cover, we don't expect to hit this
def svc(args):
"""Wrapper for daemontools svc cmd"""
# svc never writes to stdout.
cmd = ('s6-svc',) + tuple(args)
trace('CMD: %s', cmd)
process = Popen(cmd, stderr=PIPE)
_, error = process.communicate()
if error.startswith(b's6-svc: fatal: unable to control '):
raise Unsupervised(cmd, error)
if process.returncode: # pragma: no cover: there's no known way to hit this.
import sys
sys.stderr.write(error.decode('UTF-8'))
raise CalledProcessError(process.returncode, cmd)