Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def tearDown(self):
'''
Teardown method to remove installed packages
'''
user = ''
user_info = self.run_call(' grains.get username', local=True)
if user_info and isinstance(user_info, (list, tuple)) and isinstance(user_info[-1], six.string_types):
user = user_info[-1].strip()
super(CallTest, self).tearDown()
'isbase': [False, False, False, False, False, True, True],
'installed': [False, False, False, False, False, True, True],
'registerrelease': [None, None, None, None, None, None, "42"],
}}.items():
ref_out = {
'retcode': 0,
'stdout': get_test_data(filename)
}
with patch.dict(zypper.__salt__, {'cmd.run_all': MagicMock(return_value=ref_out)}):
products = zypper.list_products()
self.assertEqual(len(products), 7)
self.assertIn(test_data['vendor'], [product['vendor'] for product in products])
for kwd in ['name', 'isbase', 'installed', 'release', 'productline', 'eol_t', 'registerrelease']:
if six.PY3:
self.assertCountEqual(test_data[kwd], [prod.get(kwd) for prod in products])
else:
self.assertEqual(test_data[kwd], sorted([prod.get(kwd) for prod in products]))
def setUp(self):
hash = hashlib.sha1()
hash.update(six.text_type(time.time()))
self.name = hash.hexdigest()[:16]
' (15.10) and later.'
)
if grains['os_family'] == 'Debian':
try:
from aptsources import sourceslist
except ImportError:
self.skipTest(
'aptsources.sourceslist python module not found'
)
ret = self.run_function('state.sls', mods='pkgrepo.managed', timeout=120)
# If the below assert fails then no states were run, and the SLS in
# tests/integration/files/file/base/pkgrepo/managed.sls needs to be
# corrected.
self.assertReturnNonEmptySaltType(ret)
for state_id, state_result in six.iteritems(ret):
self.assertSaltTrueReturn(dict([(state_id, state_result)]))
jp_data = jp_data.read()
container = False
if jp_data.startswith('#!'):
args = jp_data[:jp_data.find('\n')].split()
if len(args) >= 2:
container = args[1]
jp_data = jp_data[(jp_data.find('\n') + 1):]
if not jp_data.strip():
return {}
properties = jp()
properties.load(jp_data)
if container:
return {container: dict([(k, properties[k]) for k in six.iterkeys(properties)])}
else:
return dict([(k, properties[k]) for k in six.iterkeys(properties)])
def cache_kwargs(*args, **kw):
shared = {'__opts__': __opts__, '__salt__': __salt__}
to_delete = [i for i in kw
if i.startswith('__') and i not in shared]
dc = len(to_delete)
for i in shared:
if i not in kw:
dc = True
if dc:
kw2 = {}
for i in kw:
kw2[i] = kw[i]
kw = kw2
[kw.pop(i, None) for i in to_delete]
for i, val in six.iteritems(shared):
if not kw.get(i):
kw[i] = val
return kw
if orchestrator_output:
del data['retcode']
# If additional information is passed through via the "data" dictionary to
# the highstate outputter, such as "outputter" or "retcode", discard it.
# We only want the state data that was passed through, if it is wrapped up
# in the "data" key, as the orchestrate runner does. See Issue #31330,
# pull request #27838, and pull request #27175 for more information.
if 'data' in data:
data = data.pop('data')
indent_level = kwargs.get('indent_level', 1)
ret = [
_format_host(host, hostdata, indent_level=indent_level)[0]
for host, hostdata in six.iteritems(data)
]
if ret:
return "\n".join(ret)
log.error(
'Data passed to highstate outputter is not a valid highstate return: %s',
data
)
# We should not reach here, but if we do return empty string
return ''
Return the targets from the flat yaml file, checks opts for location but
defaults to /etc/salt/roster
'''
template = get_roster_file(__opts__)
rend = salt.loader.render(__opts__, {})
raw = compile_template(template,
rend,
__opts__['renderer'],
__opts__['renderer_blacklist'],
__opts__['renderer_whitelist'],
mask_value='passw*',
**kwargs)
conditioned_raw = {}
for minion in raw:
conditioned_raw[six.text_type(minion)] = salt.config.apply_sdb(raw[minion])
rmatcher = RosterMatcher(conditioned_raw, tgt, tgt_type, 'ipv4', opts=__opts__)
return rmatcher.targets()
for saltenv, body in six.iteritems(top):
if self.opts['pillarenv']:
if saltenv != self.opts['pillarenv']:
continue
for match, data in six.iteritems(body):
if self.matchers['confirm_top.confirm_top'](
match,
data,
self.opts.get('nodegroups', {}),
):
if saltenv not in matches:
matches[saltenv] = env_matches = []
else:
env_matches = matches[saltenv]
for item in data:
if isinstance(item, six.string_types) and item not in env_matches:
env_matches.append(item)
return matches
def _numeric(n):
'''
Tell whether an argument is numeric
'''
return isinstance(n, salt.ext.six.integer_types + (float, ))