Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_chown(self):
with local.tempdir() as dir:
p = dir / "foo.txt"
p.write(six.b("hello"))
assert p.uid == os.getuid()
assert p.gid == os.getgid()
p.chown(p.uid.name)
assert p.uid == os.getuid()
def test_tunnel(self):
with self._connect() as rem:
p = (rem.python["-u"] << self.TUNNEL_PROG).popen()
try:
port = int(p.stdout.readline().decode("ascii").strip())
except ValueError:
print(p.communicate())
raise
with rem.tunnel(12222, port) as tun:
s = socket.socket()
s.connect(("localhost", 12222))
s.send(six.b("world"))
data = s.recv(100)
s.close()
print(p.communicate())
assert data == b"hello world"
self.stdin.flush()
i = (i + 1) % len(sources)
name, coll, pipe, outfile = sources[i]
line = pipe.readline()
# logger.debug("%s> %r", name, line)
if not line:
del sources[i]
elif outfile:
outfile.write(line)
outfile.flush()
else:
coll.append(line)
self.wait()
stdout = six.b("").join(six.b(s) for s in stdout)
stderr = six.b("").join(six.b(s) for s in stderr)
return stdout, stderr
self.stdin.write(line)
self.stdin.flush()
i = (i + 1) % len(sources)
name, coll, pipe, outfile = sources[i]
line = pipe.readline()
# logger.debug("%s> %r", name, line)
if not line:
del sources[i]
elif outfile:
outfile.write(line)
outfile.flush()
else:
coll.append(line)
self.wait()
stdout = six.b("").join(six.b(s) for s in stdout)
stderr = six.b("").join(six.b(s) for s in stderr)
return stdout, stderr
def readline(self):
"""Reads the next line from the pipe; returns "" when the special marker is reached.
Raises ``EOFError`` if the underlying pipe has closed"""
if self.pipe is None:
return six.b("")
line = self.pipe.readline()
if not line:
raise EOFError()
if line.strip() == self.marker:
self.pipe = None
line = six.b("")
return line
def _read_all(self):
self._fileobj.seek(0)
data = []
while True:
buf = self._fileobj.read(self.CHUNK_SIZE)
data.append(buf)
if len(buf) < self.CHUNK_SIZE:
break
return six.b("").join(data)
if isinstance(cmd, BaseCommand):
full_cmd = cmd.formulate(1)
else:
full_cmd = cmd
marker = "--.END%s.--" % (time.time() * random.random(), )
if full_cmd.strip():
full_cmd += " ; "
else:
full_cmd = "true ; "
full_cmd += "echo $? ; echo '%s'" % (marker, )
if not self.isatty:
full_cmd += " ; echo '%s' 1>&2" % (marker, )
if self.custom_encoding:
full_cmd = full_cmd.encode(self.custom_encoding)
shell_logger.debug("Running %r", full_cmd)
self.proc.stdin.write(full_cmd + six.b("\n"))
self.proc.stdin.flush()
self._current = SessionPopen(
self.proc, full_cmd, self.isatty, self.proc.stdin,
MarkedPipe(self.proc.stdout, marker),
MarkedPipe(self.proc.stderr, marker), self.custom_encoding)
return self._current
def get_pe_subsystem(filename):
with open(filename, "rb") as f:
if f.read(2) != six.b("MZ"):
return None
f.seek(LFANEW_OFFSET)
lfanew = struct.unpack("L", f.read(4))[0]
f.seek(lfanew)
if f.read(4) != six.b("PE\x00\x00"):
return None
f.seek(FILE_HEADER_SIZE + SUBSYSTEM_OFFSET, 1)
subsystem = struct.unpack("H", f.read(2))[0]
return subsystem
while True:
rlist, _, _ = select([proc.stdout.channel], [], [])
for _ in rlist:
yield
else:
# Python 3.4 implementation
def selector():
sel = DefaultSelector()
sel.register(proc.stdout.channel, EVENT_READ)
while True:
for key, mask in sel.select():
yield
for _ in selector():
if proc.stdout.channel.recv_ready():
yield 0, decode(six.b(proc.stdout.readline(linesize)))
if proc.stdout.channel.recv_stderr_ready():
yield 1, decode(six.b(proc.stderr.readline(linesize)))
if proc.poll() is not None:
break
for line in proc.stdout:
yield 0, decode(six.b(line))
for line in proc.stderr:
yield 1, decode(six.b(line))