Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_path_open_local_write_remote_read(self):
with self._connect() as rem:
# TODO: cf. note on Python 2.6 support above
with rem.tempdir() as remote_tmpdir:
with local.tempdir() as tmpdir:
assert remote_tmpdir.is_dir()
assert tmpdir.is_dir()
data = six.b("hello world")
with (tmpdir / "bar.txt").open("wb") as f:
f.write(data)
rem.upload(
(tmpdir / "bar.txt"),
(remote_tmpdir / "bar.txt")
)
assert (remote_tmpdir / "bar.txt").open("rb").read() == data
assert not remote_tmpdir.exists()
assert not tmpdir.exists()
def write(self, data, encoding=None, mode=None):
if encoding:
data = data.encode(encoding)
if mode is None:
if isinstance(data, six.unicode_type):
mode = 'w'
else:
mode = 'wb'
with self.open(mode) as f:
f.write(data)
def close(self):
"""Closes (terminates) the shell session"""
if not self.alive():
return
try:
self.proc.stdin.write(six.b("\nexit\n\n\nexit\n\n"))
self.proc.stdin.flush()
time.sleep(0.05)
except (ValueError, EnvironmentError):
pass
for p in [self.proc.stdin, self.proc.stdout, self.proc.stderr]:
try:
p.close()
except Exception:
pass
try:
self.proc.kill()
except EnvironmentError:
pass
self.proc = None
def __call__(self, function):
m = six.getfullargspec(function)
args_names = list(m.args[1:])
positional = [None] * len(args_names)
varargs = None
for i in range(min(len(positional), len(self.args))):
positional[i] = self.args[i]
if len(args_names) + 1 == len(self.args):
varargs = self.args[-1]
# All args are positional, so convert kargs to positional
for item in self.kargs:
if item == m.varargs:
varargs = self.kargs[item]
else:
def list_processes(self):
"""
Returns information about all running processes (on Windows: using ``tasklist``)
.. versionadded:: 1.3
"""
import csv
tasklist = local["tasklist"]
output = tasklist("/V", "/FO", "CSV")
if not six.PY3:
# The Py2 csv reader does not support non-ascii values
output = output.encode('ascii', 'ignore')
lines = output.splitlines()
rows = csv.reader(lines)
header = next(rows)
imgidx = header.index('Image Name')
pididx = header.index('PID')
statidx = header.index('Status')
useridx = header.index('User Name')
for row in rows:
yield ProcInfo(
int(row[pididx]), row[useridx], row[statidx], row[imgidx])
else:
return st
@staticmethod
def u(s):
return s.decode("unicode-escape")
@staticmethod
def get_method_function(m):
return m.im_func
str = unicode_type
# Try/except fails because io has the wrong StringIO in Python2
# You'll get str/unicode errors
if six.PY3:
from io import StringIO
else:
from StringIO import StringIO # type: ignore
@contextmanager
def captured_stdout(stdin=""):
"""
Captures stdout (similar to the redirect_stdout in Python 3.4+, but with slightly different arguments)
"""
prevstdin = sys.stdin
prevstdout = sys.stdout
sys.stdin = StringIO(six.u(stdin))
sys.stdout = StringIO()
try:
yield sys.stdout
def get_pe_subsystem(filename):
with open(filename, "rb") as f:
if f.read(2) != six.b("MZ"):
return None
f.seek(LFANEW_OFFSET)
lfanew = struct.unpack("L", f.read(4))[0]
f.seek(lfanew)
if f.read(4) != six.b("PE\x00\x00"):
return None
f.seek(FILE_HEADER_SIZE + SUBSYSTEM_OFFSET, 1)
subsystem = struct.unpack("H", f.read(2))[0]
return subsystem
def _path_write(self, fn, data):
if self.custom_encoding and isinstance(data, six.unicode_type):
data = data.encode(self.custom_encoding)
f = self.sftp.open(str(fn), 'wb')
f.write(data)
f.close()
def _path_stat(self, fn):
def __init__(self, argv, retcode, stdout, stderr):
Exception.__init__(self, argv, retcode, stdout, stderr)
self.argv = argv
self.retcode = retcode
if six.PY3 and isinstance(stdout, six.bytes):
stdout = six.ascii(stdout)
if six.PY3 and isinstance(stderr, six.bytes):
stderr = six.ascii(stderr)
self.stdout = stdout
self.stderr = stderr
def popen(self, args=(), **kwargs):
from plumbum.machines.local import LocalPath
from plumbum.machines.remote import RemotePath
if self.KWARG in kwargs and kwargs[self.KWARG] not in (PIPE, None):
raise RedirectionError("%s is already redirected" % (self.KWARG, ))
if isinstance(self.file, RemotePath):
raise TypeError("Cannot redirect to/from remote paths")
if isinstance(self.file, six.string_types + (LocalPath, )):
f = kwargs[self.KWARG] = open(str(self.file), self.MODE)
else:
kwargs[self.KWARG] = self.file
f = None
try:
return self.cmd.popen(args, **kwargs)
finally:
if f:
f.close()