Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def it_only_starts_the_indicated_services(self, in_example_dir, request):
check_call(('pgctl', 'start', 'sleep'))
assert_svstat('playground/sleep', state='up')
assert_svstat('playground/tail', state=SvStat.UNSUPERVISED)
totally unpredictable error message
>>> svstat_parse('down (exitcode 0) 0 seconds, normally up, want wat, ready 0 seconds')
Traceback (most recent call last):
...
ValueError: unexpected value for `process`: 'wat'
>>> svstat_parse('down (exitcode 0) 0 seconds, normally up, want up\x00, ready 0 seconds')
down (exitcode 0) 0 seconds, starting
'''
status = svstat_string.strip()
trace('RAW : %s', status)
if status.startswith(('up ', 'down ')):
state, buffer = parse(status, '', ' ')
elif status.startswith('unable to chdir:'):
return SvStat(SvStat.INVALID, None, None, None, None)
elif (
status.startswith('s6-svstat: fatal: unable to read status for ') and status.endswith((
': No such file or directory',
': Broken pipe',
))
):
# the service has never been started before; it's down.
return SvStat(SvStat.UNSUPERVISED, None, None, None, None)
else: # unknown errors
return SvStat(status, None, None, None, None)
pid, buffer = parse(buffer, '(pid ', ') ', int)
exitcode, buffer = parse(buffer, '(exitcode ', ') ', int)
_, buffer = parse(buffer, '(signal ', ') ')
seconds, buffer = parse(buffer, '', ' seconds', int)
status = svstat_string.strip()
trace('RAW : %s', status)
if status.startswith(('up ', 'down ')):
state, buffer = parse(status, '', ' ')
elif status.startswith('unable to chdir:'):
return SvStat(SvStat.INVALID, None, None, None, None)
elif (
status.startswith('s6-svstat: fatal: unable to read status for ') and status.endswith((
': No such file or directory',
': Broken pipe',
))
):
# the service has never been started before; it's down.
return SvStat(SvStat.UNSUPERVISED, None, None, None, None)
else: # unknown errors
return SvStat(status, None, None, None, None)
pid, buffer = parse(buffer, '(pid ', ') ', int)
exitcode, buffer = parse(buffer, '(exitcode ', ') ', int)
_, buffer = parse(buffer, '(signal ', ') ')
seconds, buffer = parse(buffer, '', ' seconds', int)
buffer = buffer.lstrip(', ')
# we actually dont care about this value
_, buffer = parse(buffer, 'normally ', ', ')
process, buffer = parse(buffer, 'want ', ', ')
if process is not None:
process = process.strip('\x00') # s6 microbug
if process == 'up':
process = 'starting'