Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# helper for running sut as subprocess within pty
# does two things
# * test app running in pty in subprocess
# * get test coverage from subprocess
# docu:
# http://blog.fizyk.net.pl/blog/gathering-tests-coverage-for-subprocesses-in-python.html
PY3 = sys.version_info[0] >= 3
if PY3:
basestring = str
class SimplePty(PtyProcess):
"""Simple wrapper around a process running in a pseudoterminal.
This class exposes a similar interface to :class:`PtyProcess`, but its read
methods return unicode, and its :meth:`write` accepts unicode.
"""
def __init__(self, pid, fd, encoding='utf-8', codec_errors='strict'):
super(SimplePty, self).__init__(pid, fd)
self.encoding = encoding
self.codec_errors = codec_errors
self.decoder = codecs.getincrementaldecoder(encoding)(errors=codec_errors)
def read(self, size=1024):
"""Read at most ``size`` bytes from the pty, return them as unicode.
Can block if there is nothing to read. Raises :exc:`EOFError` if the
import time
import os
if sys.platform.startswith("win"):
import winpty
else:
import ptyprocess
__all__ = ["PtyProcess", "Screen", "ByteStream", "Terminal"]
if sys.platform.startswith("win"):
ParentPtyProcess = winpty.PtyProcess
else:
ParentPtyProcess = ptyprocess.PtyProcess
class PtyProcess(ParentPtyProcess):
def read(self, nbytes):
if sys.platform.startswith("win"):
return super(PtyProcess, self).read(nbytes).encode("utf-8")
else:
return super(PtyProcess, self).read(nbytes)
def write(self, data):
if sys.platform.startswith("win"):
super(PtyProcess, self).write(data.decode("utf-8"))
else:
super(PtyProcess, self).write(data)
assert self.pid is None, 'The pid member must be None.'
assert self.command is not None, 'The command member must not be None.'
kwargs = {'echo': self.echo, 'preexec_fn': preexec_fn}
if self.ignore_sighup:
def preexec_wrapper():
"Set SIGHUP to be ignored, then call the real preexec_fn"
signal.signal(signal.SIGHUP, signal.SIG_IGN)
if preexec_fn is not None:
preexec_fn()
kwargs['preexec_fn'] = preexec_wrapper
if dimensions is not None:
kwargs['dimensions'] = dimensions
self.ptyproc = ptyprocess.PtyProcess.spawn(self.args, env=self.env,
cwd=self.cwd, **kwargs)
self.pid = self.ptyproc.pid
self.child_fd = self.ptyproc.fd
self.terminated = False
self.closed = False
def _spawnpty(self, args, **kwargs):
'''Spawn a pty and return an instance of PtyProcess.'''
return ptyprocess.PtyProcess.spawn(args, **kwargs)
def _spawnpty(self, args, **kwargs):
'''Spawn a pty and return an instance of PtyProcess.'''
return ptyprocess.PtyProcess.spawn(args, **kwargs)
cwd = cmd["cwd"]
if cwd == "" or cwd is None:
cwd = None
else:
if not os.path.exists(cwd):
cwd = None
# Fix up the PATH variable on cygwin.
if sys.platform == "cygwin":
if "Path" in env and "PATH" not in env:
env["PATH"] = env["Path"]
del env["Path"]
env["PATH"] = cygwin_convert_path_variable(env["PATH"])
try:
pty = ptyprocess.PtyProcess.spawn(cmd["argv"], dimensions=(rows, columns), env=env, cwd=cwd)
except FileNotFoundError:
pty = DeadPty(cmd["argv"])
pty_reader = NonblockingFileReader(read=pty.read)
pty_writer = NonblockingFileWriter(write=pty.write)
pty_id = pty_counter
pty_list.append({
"id": pty_id,
"pty": pty,
"reader": pty_reader,
"readDecoder": codecs.lookup("utf8").incrementaldecoder(errors="ignore"),
"writer": pty_writer})
pty_counter += 1
send_to_controller({ "type": "created", "id": pty_id })
def _spawnpty(self, args, **kwargs):
'''Spawn a pty and return an instance of PtyProcess.'''
return ptyprocess.PtyProcess.spawn(args, **kwargs)
kubectl_command = [
'kubectl',
'--namespace', self.namespace,
'exec',
'-c', 'shell',
'--stdin'
] + tty_args + [
self.pod_name,
'--'
] + command
# FIXME: Is this async friendly?
if ssh_process.get_terminal_type():
# PtyProcess and asyncssh disagree on ordering of terminal size
ts = ssh_process.get_terminal_size()
process = PtyProcess.spawn(argv=kubectl_command, dimensions=(ts[1], ts[0]))
await ssh_process.redirect(process, process, process)
loop = asyncio.get_event_loop()
# Future for spawned process dying
# We explicitly create a threadpool of 1 threads for every run_in_executor call
# to help reason about interaction between asyncio and threads. A global threadpool
# is fine when using it as a queue (when doing HTTP requests, for example), but not
# here since we could end up deadlocking easily.
shell_completed = loop.run_in_executor(ThreadPoolExecutor(1), process.wait)
# Future for ssh connection closing
read_stdin = asyncio.ensure_future(ssh_process.stdin.read())
# This loops is here to pass TerminalSizeChanged events through to ptyprocess
# It needs to break when the ssh connection is gone or when the spawned process is gone.
# See https://github.com/ronf/asyncssh/issues/134 for info on how this works
from .views import css
from .autocomplete import Autocomplete
import os
import threading
import re
import sublime
try:
if core.platform.windows:
from winpty import PtyProcess
else:
from ptyprocess import PtyProcess as _PtyProcess
class PtyProcess(_PtyProcess):
def read(self):
return super().read().decode('utf-8')
SUPPORTED = True
except ImportError:
# this stuff is broken on > 4000 until the packages are 3.8 compatible
SUPPORTED = False
class TtyProcess:
def __init__(self, command: List[str], on_output: Optional[Callable[[str], None]], on_close: Optional[Callable[[], None]] = None, cwd=None) -> None:
print('Starting process: {}'.format(command))
self.process = PtyProcess.spawn(command, cwd=cwd)
self.pid = self.process.pid
self.on_close = on_close
global pty_list
global pty_counter
# Create a new pty.
rows = cmd["rows"]
columns = cmd["columns"]
env = cmd["env"]
# Fix up the PATH variable on cygwin.
if sys.platform == "cygwin":
if "Path" in env and "PATH" not in env:
env["PATH"] = env["Path"]
del env["Path"]
env["PATH"] = cygwin_convert_path_variable(env["PATH"])
pty = ptyprocess.PtyProcess.spawn(cmd["argv"], dimensions=(rows, columns), env=env) #cwd=, )
pty_reader = NonblockingFileReader(read=pty.read)
pty_writer = NonblockingFileWriter(write=pty.write)
pty_id = pty_counter
pty_list.append({
"id": pty_id,
"pty": pty,
"reader": pty_reader,
"readDecoder": codecs.lookup("utf8").incrementaldecoder(errors="ignore"),
"writer": pty_writer})
pty_counter += 1
send_to_controller({ "type": "created", "id": pty_id })
return True