Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# -*- coding: utf-8 -*-
import execnet
def multiplier(channel, factor):
while not channel.isclosed():
param = channel.receive()
channel.send(param * factor)
gw = execnet.makegateway()
channel = gw.remote_exec(multiplier, factor=10)
for i in range(5):
channel.send(i)
result = channel.receive()
assert result == i * 10
gw.exit()
def test_hrsync_one_host(self, mysetup):
source, dest = mysetup.source, mysetup.dest
gw = execnet.makegateway("popen//chdir=%s" % dest)
finished = []
rsync = HostRSync(source)
rsync.add_target_host(gw, finished=lambda: finished.append(1))
source.join("hello.py").write("world")
rsync.send()
gw.exit()
assert dest.join(source.basename, "hello.py").check()
assert len(finished) == 1
def test_hrsync_one_host(self, mysetup):
source, dest = mysetup.source, mysetup.dest
gw = execnet.makegateway("popen//chdir=%s" % dest)
finished = []
rsync = HostRSync(source)
rsync.add_target_host(gw, finished=lambda: finished.append(1))
source.join("hello.py").write("world")
rsync.send()
gw.exit()
assert dest.join(source.basename, "hello.py").check()
assert len(finished) == 1
def test_basic_group(self, monkeypatch):
import atexit
atexitlist = []
monkeypatch.setattr(atexit, "register", atexitlist.append)
group = Group()
assert atexitlist == [group._cleanup_atexit]
exitlist = []
joinlist = []
class PseudoIO:
def wait(self):
pass
class PseudoSpec:
via = None
class PseudoGW:
id = "9999"
_io = PseudoIO()
spec = PseudoSpec()
def test_group_PopenGateway(self):
group = Group()
gw = group.makegateway("popen")
assert list(group) == [gw]
assert group[0] == gw
assert len(group) == 1
group._cleanup_atexit()
assert not group._gateways
def bootstrap_exec(io, spec):
try:
sendexec(
io,
inspect.getsource(gateway_base),
"execmodel = get_execmodel(%r)" % spec.execmodel,
"io = init_popen_io(execmodel)",
"io.write('1'.encode('ascii'))",
"serve(io, id='%s-worker')" % spec.id,
)
s = io.read(1)
assert s == "1".encode("ascii")
except EOFError:
ret = io.wait()
if ret == 255:
raise HostNotFound(io.remoteaddress)
assert not spec.python, "socket: specifying python executables not yet supported"
gateway_id = spec.installvia
if gateway_id:
host, port = start_via(group[gateway_id])
else:
host, port = spec.socket.split(":")
port = int(port)
socket = execmodel.socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
io = SocketIO(sock, execmodel)
io.remoteaddress = "%s:%d" % (host, port)
try:
sock.connect((host, port))
except execmodel.socket.gaierror:
raise HostNotFound(str(sys.exc_info()[1]))
return io
def test_deprecation(recwarn, monkeypatch):
execnet.PopenGateway().exit()
assert recwarn.pop(DeprecationWarning)
monkeypatch.setattr(socket, "socket", fails)
py.test.raises(Exception, execnet.SocketGateway, "localhost", 8811)
assert recwarn.pop(DeprecationWarning)
monkeypatch.setattr(subprocess, "Popen", fails)
py.test.raises(Exception, execnet.SshGateway, "not-existing")
assert recwarn.pop(DeprecationWarning)
def test_deprecation(recwarn, monkeypatch):
execnet.PopenGateway().exit()
assert recwarn.pop(DeprecationWarning)
monkeypatch.setattr(py.std.socket, 'socket', lambda *args: 0/0)
py.test.raises(Exception, 'execnet.SocketGateway("localhost", 8811)')
assert recwarn.pop(DeprecationWarning)
monkeypatch.setattr(py.std.subprocess, 'Popen', lambda *args,**kwargs: 0/0)
py.test.raises(Exception, 'execnet.SshGateway("not-existing")')
assert recwarn.pop(DeprecationWarning)
def getgateway(host, configfile=None):
xspec = "ssh=%s" % host
if configfile is not None:
xspec += "//ssh_config=%s" % configfile
return execnet.makegateway(xspec)