Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@cached_property
def _data(self):
header = ['RO', 'RA', 'SSZ', 'BSZ', 'StartSec', 'Size', 'Device']
command = 'blockdev --report %s'
blockdev = self.run(command % self.device)
if blockdev.rc != 0 or blockdev.stderr:
raise RuntimeError("Failed to gather data: %s" % blockdev.stderr)
output = blockdev.stdout.splitlines()
if len(output) < 2:
raise RuntimeError("No data from %s" % self.device)
if output[0].split() != header:
raise RuntimeError('Unknown output of blockdev: %s' % output[0])
fields = output[1].split()
return {
'rw_mode': str(fields[0]),
'read_ahead': int(fields[1]),
'sector_size': int(fields[2]),
@cached_property
def _command(self):
return self.find_command('netstat')
@cached_property
def _sysctl_command(self):
return self.find_command('sysctl')
@cached_property
def inventory(self):
return get_ansible_inventory(self.ansible_config, self.inventory_file)
@cached_property
def client(self):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
cfg = {
"hostname": self.host.name,
"port": int(self.host.port) if self.host.port else 22,
"username": self.host.user,
"timeout": self.timeout,
}
if self.ssh_config:
with open(self.ssh_config) as f:
ssh_config = paramiko.SSHConfig()
ssh_config.parse(f)
self._load_ssh_config(client, cfg, ssh_config)
else:
# fallback reading ~/.ssh/config
@cached_property
def ansible_config(self):
return get_ansible_config()
@cached_property
def _service_command(self):
return self.find_command("rc-service")
@cached_property
def _ip(self):
return self.find_command('ip')
@cached_property
def sysinfo(self):
sysinfo = {
"type": None,
"distribution": None,
"codename": None,
"release": None,
}
uname = self.run_expect([0, 1], 'uname -s')
if uname.rc == 1:
# FIXME: find a better way to detect windows here
sysinfo.update(**self._get_windows_sysinfo())
return sysinfo
sysinfo["type"] = uname.stdout.rstrip("\r\n").lower()
if sysinfo["type"] == "linux":
sysinfo.update(**self._get_linux_sysinfo())
elif sysinfo["type"] == "darwin":