Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'port': kwargs.get('port', 22),
'ssh_opts': SSH_OPTS,
'scp_opts': SSH_OPTS}
if password:
settings['password'] = kwargs.get('password')
if keyfile:
keyfile = os.path.expanduser(keyfile)
assert os.path.isfile(keyfile), 'No keyfile {} exists?'.format(keyfile)
log.debug("Attempting to auth ssh with keyfile {}".format(keyfile))
settings['keyfile'] = keyfile
elif password:
settings['password'] = kwargs.get('password')
ssh = SshMachine(hostname, **settings)
return ssh
def test_deploy(self):
rem = SshMachine("localhost")
SshMachine.python = rem[sys.executable]
with DeployedServer(rem) as dep:
conn = dep.classic_connect()
print(conn.modules.sys)
func = conn.modules.os.getcwd
print(func())
try:
func()
except EOFError:
pass
else:
self.fail("expected an EOFError")
with SshMachine("localhost") as rem:
with rem.tempdir() as dir2:
copy(dir / "orig", dir2)
s3 = sorted(f.name for f in (dir2 / "orig").walk())
assert s1 == s3
copy(dir2 / "orig", dir2 / "dup")
s4 = sorted(f.name for f in (dir2 / "dup").walk())
assert s1 == s4
copy(dir2 / "dup", dir / "dup2")
s5 = sorted(f.name for f in (dir / "dup2").walk())
assert s1 == s5
with SshMachine("localhost") as rem2:
with rem2.tempdir() as dir3:
copy(dir2 / "dup", dir3)
s6 = sorted(f.name for f in (dir3 / "dup").walk())
assert s1 == s6
move(dir3 / "dup", dir / "superdup")
assert not (dir3 / "dup").exists()
s7 = sorted(f.name for f in (dir / "superdup").walk())
assert s1 == s7
# test rm
delete(dir)
def _connect(self):
return SshMachine(TEST_HOST)
def connect():
SshMachine(TEST_HOST, password='swordfish',
ssh_opts=['-o', 'UserKnownHostsFile=/dev/null',
'-o', 'UpdateHostKeys=no'])
pytest.raises(HostPublicKeyUnknown, connect)
def connect(self):
m = SshMachine(TEST_HOST)
self.remotes.append(m)
return m
def test_putty_command(self, mocker, ssh_port):
local = mocker.patch('plumbum.machines.ssh_machine.local')
init = mocker.spy(SshMachine, '__init__')
mocker.patch('plumbum.machines.ssh_machine.BaseRemoteMachine')
host = mocker.MagicMock()
user = local.env.user
port = keyfile = None
ssh_command = local["plink"]
scp_command = local["pscp"]
ssh_opts = ["-ssh"]
if ssh_port == 'default':
putty_port = None
scp_opts = ()
else:
putty_port = int(ssh_port)
ssh_opts.extend(['-P', ssh_port])
scp_opts = ['-P', ssh_port]
encoding = mocker.MagicMock()
keyfile = None
if 'ssh_keyfile' in self.global_config:
keyfile = self.global_config['ssh_keyfile']
ssh_opts += ('-o', 'IdentityFile=%s' % keyfile)
if self.use_controlpersist:
ssh_opts += ('-oControlMaster=auto',
'-oControlPersist=30m',
'-oControlPath=~/.ssh/distaf-ssh-%r@%h:%p')
conn_name = "%s@%s" % (user, node)
# if no existing connection, create one
if conn_name not in self.sshconns:
# we already have plumbum imported for rpyc, so let's use it
ssh = SshMachine(node, user, ssh_opts=ssh_opts)
self.sshconns[conn_name] = ssh
else:
ssh = self.sshconns[conn_name]
if ssh:
self.logger.debug("Have ssh for %s. Returning ssh." % conn_name)
return ssh
self.logger.error("oops. did not get ssh for %s", conn_name)
return None
import sys
import rpyc
from plumbum import SshMachine
from rpyc.utils.zerodeploy import DeployedServer
from rpyc.utils.splitbrain import splitbrain
mach = SshMachine("192.168.1.117")
print sys.platform
with DeployedServer(mach) as dep:
conn = dep.classic_connect()
print conn.modules.sys.platform
try:
import posix
except ImportError as ex:
print ex
with splitbrain(conn):
import posix
print posix.stat("/boot")