Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not suppress_known_hosts or b'Warning: Permanently added' not in data:
sys.stderr.write(data.decode('utf-8', 'ignore'))
finally:
os.close(pty_fd)
pid, retval = os.waitpid(pid, 0)
retval = (retval & 0xff00) >> 8
return retval, output
class Error(Exception):
pass
class ArgError(Error):
pass
class SshExecError(Error):
def __init__(self, msg, retval):
super().__init__(msg)
self.retval = retval
# Arguments to pass to SSH to allow a man in the middle attack
mitm_args = ['-oStrictHostKeyChecking=no', '-oUserKnownHostsFile=/dev/null']
def ssh_from_cfg(cfg_filename, username, password, hostname=None, allow_mitm=False, no_resolve=False):
dirty = True
cfg = configparser.ConfigParser()
cfg.setdefault('auth', {})
if exists(cfg_filename):
cfg.read(cfg_filename)
dirty = False
cmdparser = subparser.add_parser(command, help=inspect.getdoc(fn),
parents=[shared])
opt_fn(cmdparser)
cmdparser.set_defaults(cmdobj=fn)
options = parser.parse_args(args)
if options.robot:
installer.set_hostname(options.robot)
try:
retval = options.cmdobj(options)
installer.execute_remote()
except ArgError as e:
parser.error(str(e))
retval = 1
except Error as e:
sys.stderr.write(str(e) + '\n')
retval = 1
if retval is None:
retval = 0
elif retval is True:
retval = 0
elif retval is False:
retval = 1
return retval
hn = hostname.lower()
for line in fp:
if re.match(r'\s*host\s+%s\s*' % hn, line.lower()):
no_resolve = True
break
except Exception:
pass
if not no_resolve:
try:
print("Looking up hostname", hostname, '...')
# addrs = [(family, socktype, proto, canonname, sockaddr)]
addrs = socket.getaddrinfo(hostname, None)
except socket.gaierror as e:
raise Error("Could not find robot at %s" % hostname) from e
# Sort the address by family.
# Lucky for us, the family type is the first element of the tuple, and it's an enumerated type with
# AF_INET=2 (IPv4) and AF_INET6=23 (IPv6), so sorting them will provide us with the AF_INET address first.
addrs.sort()
# pick the first address that is sock_stream
# AF_INET sockaddr tuple: (address, port)
# AF_INET6 sockaddr tuple: (address, port, flow info, scope id)
for _, socktype, _, _, sockaddr in addrs:
if socktype == socket.SOCK_STREAM:
ip = sockaddr[0] # The address if the first tuple element for both AF_INET and AF_INET6
print("-> Found %s at %s" % (hostname, ip))
print()
hostname = ip
break
def install_opkg(self, options):
opkg = self._get_opkg()
# Write out the install script
# -> we use a script because opkg doesn't have a good mechanism
# to only install a package if it's not already installed
opkg_script_fname = join(self.opkg_cache, 'install_opkg.sh')
opkg_script = ""
opkg_files = []
package_list = opkg.resolve_pkg_deps(options.packages)
for package in package_list:
try:
pkg, fname = opkg.get_cached_pkg(package)
except OpkgError as e:
raise Error(e)
opkg_script_bit = inspect.cleandoc('''
set -e
if ! opkg list-installed | grep -F '%(name)s - %(version)s'; then
opkg install %(options)s opkg_cache/%(fname)s
else
echo "%(name)s already installed, continuing..."
fi
''')
opkg_script_bit %= {
'fname': basename(fname),
'name': pkg['Package'],
'version': pkg['Version'],
'options': "--force-reinstall" if options.force_reinstall else ""
}
def download_pip(self, options):
'''
Specify python package(s) to download, and store them in the cache
'''
ensure_win_bins()
try:
import pip
except ImportError:
raise Error("ERROR: pip must be installed to download python packages")
if len(options.requirement) == 0 and len(options.packages) == 0:
raise ArgError("You must give at least one requirement to install")
# Use pip install --download to put packages into the cache
pip_args = ['install',
'--download',
self.pip_cache]
pip_args.extend(self._process_pip_args(options))
for r in options.requirement:
pip_args.extend(['-r', r])
pip_args.extend(options.packages)
if is_windows:
cmd = join(self.win_bins, 'psftp.exe')
# psftp has a -pw argument we can use, which is nice
sftp_args = [ cmd, '-pw', self.password ] + sftp_args
try:
subprocess.check_call(sftp_args)
except subprocess.CalledProcessError as e:
raise SshExecError(e, e.returncode)
else:
cmd = shutil.which('sftp')
if cmd is None:
raise Error("Cannot find sftp executable!")
if self._allow_mitm:
sftp_args = mitm_args + sftp_args
# Must disable BatchMode, else password interaction doesn't work
sftp_args = [cmd, '-oBatchMode=no'] + sftp_args
retval, _ = ssh_exec_pass(self.password, sftp_args,
suppress_known_hosts=self._allow_mitm)
if retval != 0:
raise SshExecError('Command %s returned non-zero error status %s' % (sftp_args, retval),
retval)
finally:
try:
os.unlink(bfname)
data = _read(stderr_fd)
if data is not None:
if not suppress_known_hosts or b'Warning: Permanently added' not in data:
sys.stderr.write(data.decode('utf-8', 'ignore'))
finally:
os.close(pty_fd)
pid, retval = os.waitpid(pid, 0)
retval = (retval & 0xff00) >> 8
return retval, output
class Error(Exception):
pass
class ArgError(Error):
pass
class SshExecError(Error):
def __init__(self, msg, retval):
super().__init__(msg)
self.retval = retval
# Arguments to pass to SSH to allow a man in the middle attack
mitm_args = ['-oStrictHostKeyChecking=no', '-oUserKnownHostsFile=/dev/null']
def ssh_from_cfg(cfg_filename, username, password, hostname=None, allow_mitm=False, no_resolve=False):
dirty = True
cfg = configparser.ConfigParser()
cfg.setdefault('auth', {})