Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise pytest.UsageError(e)
# Return the host name as a string
# metafunc.parametrize("ansible_host", hosts.keys())
# Return a HostManager instance where pattern=host (e.g. ansible_host.all.shell('date'))
# metafunc.parametrize("ansible_host", iter(plugin.initialize(config=plugin.config, pattern=h) for h in
# hosts.keys()))
# Return a ModuleDispatcher instance representing `host` (e.g. ansible_host.shell('date'))
metafunc.parametrize("ansible_host", iter(hosts[h] for h in hosts.keys()))
if 'ansible_group' in metafunc.fixturenames:
# assert required --ansible-* parameters were used
PyTestAnsiblePlugin.assert_required_ansible_parameters(metafunc.config)
try:
plugin = metafunc.config.pluginmanager.getplugin("ansible")
hosts = plugin.initialize(config=plugin.config, pattern=metafunc.config.getoption('ansible_host_pattern'))
except ansible.errors.AnsibleError as e:
raise pytest.UsageError(e)
# FIXME: Eeew, this shouldn't be interfacing with `hosts.options`
groups = hosts.options['inventory_manager'].list_groups()
# Return the group name as a string
# metafunc.parametrize("ansible_group", groups)
# Return a ModuleDispatcher instance representing the group (e.g. ansible_group.shell('date'))
metafunc.parametrize("ansible_group", iter(hosts[g] for g in groups))
def execute_update(self):
'''
Updates a single setting in the specified ansible.cfg
'''
raise AnsibleError("Option not implemented yet")
# pylint: disable=unreachable
if context.CLIARGS['setting'] is None:
raise AnsibleOptionsError("update option requires a setting to update")
(entry, value) = context.CLIARGS['setting'].split('=')
if '.' in entry:
(section, option) = entry.split('.')
else:
section = 'defaults'
option = entry
subprocess.call([
'ansible',
'-m', 'ini_file',
'localhost',
'-c', 'local',
def check_alias_name(name):
""" check an alias name """
# todo: check reserved keywords (any, self, ...)
if re.match('^[a-zA-Z0-9_]+$', name) is None:
raise AnsibleError(name + ': the name of the alias may only consist of the characters "a-z, A-Z, 0-9 and _"')
self._loader = loader
self.groups = groups
# Support inventory scripts that are not prefixed with some
# path information but happen to be in the current working
# directory when '.' is not in PATH.
self.filename = os.path.abspath(filename)
cmd = [ self.filename, "--list" ]
try:
sp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
raise AnsibleError("problem running %s (%s)" % (' '.join(cmd), e))
(stdout, stderr) = sp.communicate()
if sp.returncode != 0:
raise AnsibleError("Inventory script (%s) had an execution error: %s " % (filename,stderr))
# make sure script output is unicode so that json loader will output
# unicode strings itself
try:
self.data = to_unicode(stdout, errors="strict")
except Exception as e:
raise AnsibleError("inventory data from {0} contained characters that cannot be interpreted as UTF-8: {1}".format(to_str(self.filename), to_str(e)))
# see comment about _meta below
self.host_vars_from_top = None
self._parse(stderr)
self._play_context.remote_addr.lower(),
username=self._play_context.remote_user,
allow_agent=allow_agent,
look_for_keys=self.get_option('look_for_keys'),
key_filename=key_filename,
password=self._play_context.password,
timeout=self._play_context.timeout,
port=port,
**sock_kwarg
)
except paramiko.ssh_exception.BadHostKeyException as e:
raise AnsibleConnectionFailure('host key mismatch for %s' % e.hostname)
except Exception as e:
msg = str(e)
if "PID check failed" in msg:
raise AnsibleError("paramiko version issue, please upgrade paramiko on the machine running ansible")
elif "Private key file is encrypted" in msg:
msg = 'ssh %s@%s:%s : %s\nTo connect as a different user, use -u .' % (
self._play_context.remote_user, self._play_context.remote_addr, port, msg)
raise AnsibleConnectionFailure(msg)
else:
raise AnsibleConnectionFailure(msg)
return ssh
def fetch_file(self, in_path, out_path):
''' save a remote file to the specified path '''
super(Connection, self).fetch_file(in_path, out_path)
display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self._play_context.remote_addr)
try:
self.sftp = self._connect_sftp()
except Exception as e:
raise AnsibleError("failed to open a SFTP connection (%s)" % to_native(e))
try:
self.sftp.get(to_bytes(in_path, errors='surrogate_or_strict'), to_bytes(out_path, errors='surrogate_or_strict'))
except IOError:
raise AnsibleError("failed to transfer file from %s" % in_path)
requires_os_version_match = requires_os_version.match(line)
if requires_os_version_match:
min_os_version = to_text(requires_os_version_match.group(1))
if requires_os_version_match.group(2) is None:
min_os_version = "%s.0" % min_os_version
requires_become_match = requires_become.match(line)
if requires_become_match:
become_required = True
for m in set(module_names):
m = to_text(m).rstrip() # tolerate windows line endings
mu_path = ps_module_utils_loader.find_plugin(m, ".psm1")
if not mu_path:
raise AnsibleError('Could not find imported module support code for \'%s\'.' % m)
exec_manifest["powershell_modules"][m] = to_text(
base64.b64encode(
to_bytes(
_slurp(mu_path)
)
)
)
exec_manifest['min_ps_version'] = min_ps_version
exec_manifest['min_os_version'] = min_os_version
if become_required and 'become' not in exec_manifest["actions"]:
exec_manifest["actions"].insert(0, 'become')
exec_manifest["become_user"] = "SYSTEM"
exec_manifest["become_password"] = None
exec_manifest['become_flags'] = None
exec_manifest["become"] = to_text(base64.b64encode(to_bytes(become_wrapper)))
try:
plugin.update_cache_if_changed()
except AttributeError:
# some plugins might not implement caching
pass
parsed = True
display.vvv('Parsed %s inventory source with %s plugin' % (source, plugin_name))
break
except AnsibleParserError as e:
display.debug('%s was not parsable by %s' % (source, plugin_name))
tb = ''.join(traceback.format_tb(sys.exc_info()[2]))
failures.append({'src': source, 'plugin': plugin_name, 'exc': e, 'tb': tb})
except Exception as e:
display.debug('%s failed while attempting to parse %s' % (plugin_name, source))
tb = ''.join(traceback.format_tb(sys.exc_info()[2]))
failures.append({'src': source, 'plugin': plugin_name, 'exc': AnsibleError(e), 'tb': tb})
else:
display.vvv("%s declined parsing %s as it did not pass its verify_file() method" % (plugin_name, source))
else:
if not parsed and failures:
# only if no plugin processed files should we show errors.
for fail in failures:
display.warning(u'\n* Failed to parse %s with %s plugin: %s' % (to_text(fail['src']), fail['plugin'], to_text(fail['exc'])))
if 'tb' in fail:
display.vvv(to_text(fail['tb']))
if C.INVENTORY_ANY_UNPARSED_IS_FAILED:
raise AnsibleError(u'Completely failed to parse inventory source %s' % (source))
if not parsed:
if source != '/etc/ansible/hosts' or os.path.exists(source):
# only warn if NOT using the default and if using it, only if the file is present
display.warning("Unable to parse %s as an inventory source" % source)
def add_group(self, group):
if group.name not in self.groups:
self.groups[group.name] = group
else:
raise AnsibleError("group already in inventory: %s" % group.name)
if self.private_key_file:
key_filename = os.path.expanduser(self.private_key_file)
elif self.runner.private_key_file:
key_filename = os.path.expanduser(self.runner.private_key_file)
else:
key_filename = None
ssh.connect(self.host, username=self.user, allow_agent=allow_agent, look_for_keys=True,
key_filename=key_filename, password=self.password,
timeout=self.runner.timeout, port=self.port)
except Exception, e:
msg = str(e)
if "PID check failed" in msg:
raise errors.AnsibleError("paramiko version issue, please upgrade paramiko on the machine running ansible")
elif "Private key file is encrypted" in msg:
msg = 'ssh %s@%s:%s : %s\nTo connect as a different user, use -u .' % (
self.user, self.host, self.port, msg)
raise errors.AnsibleConnectionFailed(msg)
else:
raise errors.AnsibleConnectionFailed(msg)
return ssh