Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# helper for running sut as subprocess within pty
# does two things
# * test app running in pty in subprocess
# * get test coverage from subprocess
# docu:
# http://blog.fizyk.net.pl/blog/gathering-tests-coverage-for-subprocesses-in-python.html
PY3 = sys.version_info[0] >= 3
if PY3:
basestring = str
class SimplePty(PtyProcess):
"""Simple wrapper around a process running in a pseudoterminal.
This class exposes a similar interface to :class:`PtyProcess`, but its read
methods return unicode, and its :meth:`write` accepts unicode.
"""
def __init__(self, pid, fd, encoding='utf-8', codec_errors='strict'):
super(SimplePty, self).__init__(pid, fd)
self.encoding = encoding
self.codec_errors = codec_errors
self.decoder = codecs.getincrementaldecoder(encoding)(errors=codec_errors)
def read(self, size=1024):
"""Read at most ``size`` bytes from the pty, return them as unicode.
Can block if there is nothing to read. Raises :exc:`EOFError` if the
import time
import os
if sys.platform.startswith("win"):
import winpty
else:
import ptyprocess
__all__ = ["PtyProcess", "Screen", "ByteStream", "Terminal"]
if sys.platform.startswith("win"):
ParentPtyProcess = winpty.PtyProcess
else:
ParentPtyProcess = ptyprocess.PtyProcess
class PtyProcess(ParentPtyProcess):
def read(self, nbytes):
if sys.platform.startswith("win"):
return super(PtyProcess, self).read(nbytes).encode("utf-8")
else:
return super(PtyProcess, self).read(nbytes)
def write(self, data):
if sys.platform.startswith("win"):
super(PtyProcess, self).write(data.decode("utf-8"))
else:
super(PtyProcess, self).write(data)
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
'''
from __future__ import print_function
import pexpect
import unittest
from . import PexpectTestCase
import time
import sys
from ptyprocess import ptyprocess
ptyprocess._make_eof_intr()
if sys.version_info[0] >= 3:
def byte(i):
return bytes([i])
else:
byte = chr
class TestCtrlChars(PexpectTestCase.PexpectTestCase):
def setUp(self):
super(TestCtrlChars, self).setUp()
self.getch_cmd = self.PYTHONBIN + ' getch.py'
def test_control_chars(self):
'''This tests that we can send all 256 8-bit characters to a child
process.'''
child = pexpect.spawn(self.getch_cmd, echo=False, timeout=5)
def test_sendeof(self):
child = pexpect.spawn(self.getch_cmd, echo=False, timeout=5)
child.expect('READY')
child.sendeof()
child.expect(str(ord(ptyprocess._EOF)) + '
def test_sendintr (self):
child = pexpect.spawn(self.getch_cmd, echo=False, timeout=5)
child.expect('READY')
child.sendintr()
child.expect(str(ord(ptyprocess._INTR)) + '
def open(self):
"""Open ThreadedTerminal connection & start thread pulling data from it."""
ret = super(ThreadedTerminal, self).open()
if not self._terminal:
self._terminal = PtyProcessUnicode.spawn(self._cmd, dimensions=self.dimensions)
# need to not replace not unicode data instead of raise exception
self._terminal.decoder = codecs.getincrementaldecoder('utf-8')(errors='replace')
done = Event()
self.pulling_thread = TillDoneThread(target=self.pull_data,
done_event=done,
kwargs={'pulling_done': done})
self.pulling_thread.start()
retry = 0
is_operable = False
while (retry < 3) and (not is_operable):
is_operable = self._shell_operable.wait(timeout=1)
if not is_operable:
self.logger.warning("Terminal open but not fully operable yet")
self._terminal.write('\n')
def disconnect(self):
if self.vpn_process is not None:
try:
self._terminate()
except ValueError:
self._kill()
time.sleep(0.5)
while self._is_running:
if self.use_pexpect:
try:
self.vpn_process.close(True)
except ptyprocess.ptyprocess.PtyProcessError:
pass
else:
self.vpn_process.wait()
time.sleep(0.1)
self.connected = False
try:
self.instances.remove(self)
except ValueError:
pass
self.vpn_process = self.ip = self.conf_file = None
self.log.info('Disconnected')
else:
self.log.warning('Not connected')
def _spawnpty(self, args, **kwargs):
'''Spawn a pty and return an instance of PtyProcess.'''
return ptyprocess.PtyProcess.spawn(args, **kwargs)
assert self.pid is None, 'The pid member must be None.'
assert self.command is not None, 'The command member must not be None.'
kwargs = {'echo': self.echo, 'preexec_fn': preexec_fn}
if self.ignore_sighup:
def preexec_wrapper():
"Set SIGHUP to be ignored, then call the real preexec_fn"
signal.signal(signal.SIGHUP, signal.SIG_IGN)
if preexec_fn is not None:
preexec_fn()
kwargs['preexec_fn'] = preexec_wrapper
if dimensions is not None:
kwargs['dimensions'] = dimensions
self.ptyproc = ptyprocess.PtyProcess.spawn(self.args, env=self.env,
cwd=self.cwd, **kwargs)
self.pid = self.ptyproc.pid
self.child_fd = self.ptyproc.fd
self.terminated = False
self.closed = False
def _spawnpty(self, args, **kwargs):
'''Spawn a pty and return an instance of PtyProcess.'''
return ptyprocess.PtyProcess.spawn(args, **kwargs)