How to use the ptyprocess.PtyProcess function in ptyprocess

To help you get started, we’ve selected a few ptyprocess examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github markfink / whaaaaat / tests / helpers.py View on Github external
# helper for running sut as subprocess within pty
# does two things
# * test app running in pty in subprocess
# * get test coverage from subprocess

# docu:
# http://blog.fizyk.net.pl/blog/gathering-tests-coverage-for-subprocesses-in-python.html


PY3 = sys.version_info[0] >= 3

if PY3:
    basestring = str


class SimplePty(PtyProcess):
    """Simple wrapper around a process running in a pseudoterminal.

    This class exposes a similar interface to :class:`PtyProcess`, but its read
    methods return unicode, and its :meth:`write` accepts unicode.
    """

    def __init__(self, pid, fd, encoding='utf-8', codec_errors='strict'):
        super(SimplePty, self).__init__(pid, fd)
        self.encoding = encoding
        self.codec_errors = codec_errors
        self.decoder = codecs.getincrementaldecoder(encoding)(errors=codec_errors)

    def read(self, size=1024):
        """Read at most ``size`` bytes from the pty, return them as unicode.

        Can block if there is nothing to read. Raises :exc:`EOFError` if the
github randy3k / radian / tests / terminal.py View on Github external
import time
import os

if sys.platform.startswith("win"):
    import winpty
else:
    import ptyprocess


__all__ = ["PtyProcess", "Screen", "ByteStream", "Terminal"]


if sys.platform.startswith("win"):
    ParentPtyProcess = winpty.PtyProcess
else:
    ParentPtyProcess = ptyprocess.PtyProcess


class PtyProcess(ParentPtyProcess):

    def read(self, nbytes):
        if sys.platform.startswith("win"):
            return super(PtyProcess, self).read(nbytes).encode("utf-8")
        else:
            return super(PtyProcess, self).read(nbytes)

    def write(self, data):
        if sys.platform.startswith("win"):
            super(PtyProcess, self).write(data.decode("utf-8"))
        else:
            super(PtyProcess, self).write(data)
github catboost / catboost / contrib / python / pexpect / pexpect / pty_spawn.py View on Github external
assert self.pid is None, 'The pid member must be None.'
        assert self.command is not None, 'The command member must not be None.'

        kwargs = {'echo': self.echo, 'preexec_fn': preexec_fn}
        if self.ignore_sighup:
            def preexec_wrapper():
                "Set SIGHUP to be ignored, then call the real preexec_fn"
                signal.signal(signal.SIGHUP, signal.SIG_IGN)
                if preexec_fn is not None:
                    preexec_fn()
            kwargs['preexec_fn'] = preexec_wrapper

        if dimensions is not None:
            kwargs['dimensions'] = dimensions

        self.ptyproc = ptyprocess.PtyProcess.spawn(self.args, env=self.env,
                                                   cwd=self.cwd, **kwargs)

        self.pid = self.ptyproc.pid
        self.child_fd = self.ptyproc.fd


        self.terminated = False
        self.closed = False
github pypa / pipenv / pipenv / vendor / pexpect / pty_spawn.py View on Github external
def _spawnpty(self, args, **kwargs):
        '''Spawn a pty and return an instance of PtyProcess.'''
        return ptyprocess.PtyProcess.spawn(args, **kwargs)
github pexpect / pexpect / pexpect / pty_spawn.py View on Github external
def _spawnpty(self, args, **kwargs):
        '''Spawn a pty and return an instance of PtyProcess.'''
        return ptyprocess.PtyProcess.spawn(args, **kwargs)
github sedwards2009 / extraterm / extensions / ProxySessionBackend / src / python / ptyserver2.py View on Github external
cwd = cmd["cwd"]
    if cwd == "" or cwd is None:
        cwd = None
    else:
        if not os.path.exists(cwd):
            cwd = None

    # Fix up the PATH variable on cygwin.
    if sys.platform == "cygwin":
        if "Path" in env and "PATH" not in env:
            env["PATH"] = env["Path"]
            del env["Path"]
        env["PATH"] = cygwin_convert_path_variable(env["PATH"])
    try:
        pty = ptyprocess.PtyProcess.spawn(cmd["argv"], dimensions=(rows, columns), env=env, cwd=cwd)
    except FileNotFoundError:
        pty = DeadPty(cmd["argv"])

    pty_reader = NonblockingFileReader(read=pty.read)
    pty_writer = NonblockingFileWriter(write=pty.write)

    pty_id = pty_counter
    pty_list.append({
        "id": pty_id,
        "pty": pty,
        "reader": pty_reader,
        "readDecoder": codecs.lookup("utf8").incrementaldecoder(errors="ignore"),
        "writer": pty_writer})
    pty_counter += 1
    
    send_to_controller({ "type": "created", "id": pty_id })
github apple / swift-lldb / third_party / Python / module / pexpect-4.6 / pexpect / pty_spawn.py View on Github external
def _spawnpty(self, args, **kwargs):
        '''Spawn a pty and return an instance of PtyProcess.'''
        return ptyprocess.PtyProcess.spawn(args, **kwargs)
github kubessh / kubessh / kubessh / pod.py View on Github external
kubectl_command = [
            'kubectl',
            '--namespace', self.namespace,
            'exec',
            '-c', 'shell',
            '--stdin'
            ] + tty_args + [
            self.pod_name,
            '--'
        ] + command

        # FIXME: Is this async friendly?
        if ssh_process.get_terminal_type():
            # PtyProcess and asyncssh disagree on ordering of terminal size
            ts = ssh_process.get_terminal_size()
            process = PtyProcess.spawn(argv=kubectl_command, dimensions=(ts[1], ts[0]))
            await ssh_process.redirect(process, process, process)

            loop = asyncio.get_event_loop()

            # Future for spawned process dying
            # We explicitly create a threadpool of 1 threads for every run_in_executor call
            # to help reason about interaction between asyncio and threads. A global threadpool
            # is fine when using it as a queue (when doing HTTP requests, for example), but not
            # here since we could end up deadlocking easily.
            shell_completed = loop.run_in_executor(ThreadPoolExecutor(1), process.wait)
            # Future for ssh connection closing
            read_stdin = asyncio.ensure_future(ssh_process.stdin.read())

            # This loops is here to pass TerminalSizeChanged events through to ptyprocess
            # It needs to break when the ssh connection is gone or when the spawned process is gone.
            # See https://github.com/ronf/asyncssh/issues/134 for info on how this works
github daveleroy / sublime_debugger / modules / debugger / terminal.py View on Github external
from .views import css
from .autocomplete import Autocomplete

import os
import threading
import re
import sublime


try:
	if core.platform.windows:
		from winpty import PtyProcess
	else:
		from ptyprocess import PtyProcess as _PtyProcess

		class PtyProcess(_PtyProcess):
			def read(self):
				return super().read().decode('utf-8')

	SUPPORTED = True

except ImportError:
	# this stuff is broken on > 4000 until the packages are 3.8 compatible
	SUPPORTED = False

class TtyProcess:
	def __init__(self, command: List[str], on_output: Optional[Callable[[str], None]], on_close: Optional[Callable[[], None]] = None, cwd=None) -> None:
		print('Starting process: {}'.format(command))

		self.process = PtyProcess.spawn(command, cwd=cwd)
		self.pid = self.process.pid
		self.on_close = on_close
github sedwards2009 / extraterm / extraterm / src / main_process / pty / python / ptyserver2.py View on Github external
global pty_list
    global pty_counter
    
    # Create a new pty.
    rows = cmd["rows"]
    columns = cmd["columns"]
    env = cmd["env"]
    
    # Fix up the PATH variable on cygwin.
    if sys.platform == "cygwin":
        if "Path" in env and "PATH" not in env:
            env["PATH"] = env["Path"]
            del env["Path"]
        env["PATH"] = cygwin_convert_path_variable(env["PATH"])

    pty = ptyprocess.PtyProcess.spawn(cmd["argv"], dimensions=(rows, columns), env=env) #cwd=, )
    pty_reader = NonblockingFileReader(read=pty.read)
    pty_writer = NonblockingFileWriter(write=pty.write)

    pty_id = pty_counter
    pty_list.append({
        "id": pty_id,
        "pty": pty,
        "reader": pty_reader,
        "readDecoder": codecs.lookup("utf8").incrementaldecoder(errors="ignore"),
        "writer": pty_writer})
    pty_counter += 1
    
    send_to_controller({ "type": "created", "id": pty_id })
    return True