Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_subprocess_interaction(anypython):
line = gateway_io.popen_bootstrapline
compile(line, "xyz", "exec")
args = [str(anypython), "-c", line]
popen = subprocess.Popen(
args,
bufsize=0,
universal_newlines=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
def send(line):
popen.stdin.write(line)
if sys.version_info > (3, 0) or sys.platform.startswith("java"):
popen.stdin.flush()
def receive():
def test_popen_args(spec, expected_args):
expected_args = expected_args + ["-u", "-c", gateway_io.popen_bootstrapline]
args = gateway_io.popen_args(execnet.XSpec(spec))
assert args == expected_args
def test_sshconfig_config_parsing(self, monkeypatch, makegateway):
l = []
monkeypatch.setattr(
gateway_io, "Popen2IOMaster", lambda *args, **kwargs: l.append(args[0])
)
py.test.raises(AttributeError, lambda: makegateway("ssh=xyz//ssh_config=qwe"))
assert len(l) == 1
popen_args = l[0]
i = popen_args.index("-F")
assert popen_args[i + 1] == "qwe"
if not spec:
spec = self.defaultspec
if not isinstance(spec, XSpec):
spec = XSpec(spec)
self.allocate_id(spec)
if spec.execmodel is None:
spec.execmodel = self.remote_execmodel.backend
if spec.via:
assert not spec.socket
master = self[spec.via]
proxy_channel = master.remote_exec(gateway_io)
proxy_channel.send(vars(spec))
proxy_io_master = gateway_io.ProxyIO(proxy_channel, self.execmodel)
gw = gateway_bootstrap.bootstrap(proxy_io_master, spec)
elif spec.popen or spec.ssh or spec.vagrant_ssh:
io = gateway_io.create_io(spec, execmodel=self.execmodel)
gw = gateway_bootstrap.bootstrap(io, spec)
elif spec.socket:
from execnet import gateway_socket
io = gateway_socket.create_io(spec, self, execmodel=self.execmodel)
gw = gateway_bootstrap.bootstrap(io, spec)
else:
raise ValueError("no gateway type found for {!r}".format(spec._spec))
gw.spec = spec
self._register(gw)
if spec.chdir or spec.nice or spec.env:
channel = gw.remote_exec(
"""
import os
path, nice, env = channel.receive()
if path:
def test_sshconfig_config_parsing(self, monkeypatch, makegateway):
l = []
monkeypatch.setattr(gateway_io, 'Popen',
lambda *args, **kwargs: l.append(args[0]))
py.test.raises(AttributeError, lambda:
makegateway("ssh=xyz//ssh_config=qwe"))
assert len(l) == 1
popen_args = l[0]
i = popen_args.index('-F')
assert popen_args[i+1] == "qwe"
# args = ['kitchen', 'exec', spec.ssh, '-c']
# but `exec` apparently doesn't connect stdin (yet)...
args = ['ssh', '-C'] + get_kitchen_ssh_connection_info(spec.ssh)
else:
args = ['ssh', '-C']
if spec.ssh_config is not None:
args.extend(['-F', str(spec.ssh_config)])
remotecmd = '%s -c "%s"' % (remotepython, popen_bootstrapline)
if spec.type == 'vagrant' or spec.type == 'kitchen':
args.extend([remotecmd])
else:
args.extend([spec.ssh, remotecmd])
return args
execnet.gateway_io.ssh_args = new_ssh_args
class RPCWrapper(object):
def __init__(self, host):
self.host = host
def __getattr__(self, name):
def call(*args, **kw):
output.annotate(
'rpc {}: {}(*{}, **{})'.format(self.host.fqdn, name, args, kw),
debug=True)
self.host.channel.send((name, args, kw))
while True:
message = self.host.channel.receive()
output.annotate('{}: message: {}'.format(
if not spec:
spec = self.defaultspec
if not isinstance(spec, XSpec):
spec = XSpec(spec)
self.allocate_id(spec)
if spec.execmodel is None:
spec.execmodel = self.remote_execmodel.backend
if spec.via:
assert not spec.socket
master = self[spec.via]
proxy_channel = master.remote_exec(gateway_io)
proxy_channel.send(vars(spec))
proxy_io_master = gateway_io.ProxyIO(proxy_channel, self.execmodel)
gw = gateway_bootstrap.bootstrap(proxy_io_master, spec)
elif spec.popen or spec.ssh:
io = gateway_io.create_io(spec, execmodel=self.execmodel)
gw = gateway_bootstrap.bootstrap(io, spec)
elif spec.socket:
from execnet import gateway_socket
io = gateway_socket.create_io(spec, self, execmodel=self.execmodel)
gw = gateway_bootstrap.bootstrap(io, spec)
else:
raise ValueError("no gateway type found for %r" % (spec._spec,))
gw.spec = spec
self._register(gw)
if spec.chdir or spec.nice or spec.env:
channel = gw.remote_exec("""
import os
path, nice, env = channel.receive()
if path:
if not os.path.exists(path):
os.mkdir(path)
remotepython = spec.python or 'python'
if spec.type == 'vagrant':
args = ['vagrant', 'ssh', spec.ssh, '--', '-C']
else:
args = ['ssh', '-C']
if spec.ssh_config is not None:
args.extend(['-F', str(spec.ssh_config)])
remotecmd = '%s -c "%s"' % (remotepython, popen_bootstrapline)
if spec.type == 'vagrant':
args.extend([remotecmd])
else:
args.extend([spec.ssh, remotecmd])
return args
import execnet.gateway_io
execnet.gateway_io.ssh_args = new_ssh_args
logger = logging.getLogger('batou.remote')
def main(environment, timeout, dirty):
environment = Environment(environment)
environment.load()
if timeout is not None:
environment.timeout = timeout
if environment.update_method == 'rsync':
pass
elif not dirty:
check_clean_hg_repository()
environment.load_secrets()