Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUpClass(cls):
if not os.path.isdir(RUNTIME_VARS.TMP):
os.makedirs(RUNTIME_VARS.TMP)
cls.root = os.path.join(RUNTIME_VARS.BASE_FILES, 'buildout')
cls.rdir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
cls.tdir = os.path.join(cls.rdir, 'test')
for idx, url in six.iteritems(buildout._URL_VERSIONS):
log.debug('Downloading bootstrap from %s', url)
dest = os.path.join(
cls.rdir, '{0}_bootstrap.py'.format(idx)
)
try:
download_to(url, dest)
except URLError:
log.debug('Failed to download %s', url)
# creating a new setuptools install
cls.ppy_st = os.path.join(cls.rdir, 'psetuptools')
if salt.utils.platform.is_windows():
cls.bin_st = os.path.join(cls.ppy_st, 'Scripts')
cls.py_st = os.path.join(cls.bin_st, 'python')
else:
cls.bin_st = os.path.join(cls.ppy_st, 'bin')
cls.py_st = os.path.join(cls.bin_st, 'python')
def test_present_when_domain_exists(self):
self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}
cfg = {}
for k, v in six.iteritems(domain_ret):
cfg[k] = {'Options': v}
cfg['AccessPolicies'] = {'Options': '{"a": "b"}'}
self.conn.describe_elasticsearch_domain_config.return_value = {'DomainConfig': cfg}
self.conn.update_elasticsearch_domain_config.return_value = {'DomainConfig': cfg}
result = self.salt_states['boto_elasticsearch_domain.present'](
'domain present',
**domain_ret)
self.assertTrue(result['result'])
self.assertEqual(result['changes'], {'new': {'AccessPolicies': {}}, 'old': {'AccessPolicies': {'a': 'b'}}})
try:
self_functions = pycopy.copy(self.functions)
salt.utils.lazy.verify_fun(self_functions, fun)
# Inject some useful globals to *all* the function's global
# namespace only once per module-- not per func
completed_funcs = []
for mod_name in six.iterkeys(self_functions):
if '.' not in mod_name:
continue
mod, _ = mod_name.split('.', 1)
if mod in completed_funcs:
continue
completed_funcs.append(mod)
for global_key, value in six.iteritems(func_globals):
self.functions[mod_name].__globals__[global_key] = value
# There are some discrepancies of what a "low" structure is in the
# publisher world it is a dict including stuff such as jid, fun,
# arg (a list of args, with kwargs packed in). Historically this
# particular one has had no "arg" and just has had all the kwargs
# packed into the top level object. The plan is to move away from
# that since the caller knows what is an arg vs a kwarg, but while
# we make the transition we will load "kwargs" using format_call if
# there are no kwargs in the low object passed in.
if 'arg' in low and 'kwarg' in low:
args = low['arg']
kwargs = low['kwarg']
else:
f_call = salt.utils.args.format_call(
def encode_dict(data, encoding=None, errors='strict', keep=False,
preserve_dict_class=False, preserve_tuples=False):
'''
Encode all string values to bytes
'''
rv = data.__class__() if preserve_dict_class else {}
for key, value in six.iteritems(data):
if isinstance(key, tuple):
key = encode_tuple(key, encoding, errors, keep, preserve_dict_class) \
if preserve_tuples \
else encode_list(key, encoding, errors, keep,
preserve_dict_class, preserve_tuples)
else:
try:
key = salt.utils.stringutils.to_bytes(key, encoding, errors)
except TypeError:
# to_bytes raises a TypeError when input is not a
# string/bytestring/bytearray. This is expected and simply
# means we are going to leave the value as-is.
pass
except UnicodeEncodeError:
if not keep:
raise
opts = dict(
(six.text_type(x), _normalize(kwargs[x]))
for x in kwargs
if not x.startswith('_')
)
bad_opts = [x for x in opts if x not in conf_ptr]
if bad_opts:
raise SaltInvocationError(
'The following opts are not valid for port {0}: {1}'
.format(name, ', '.join(bad_opts))
)
bad_vals = [
'{0}={1}'.format(x, y) for x, y in six.iteritems(opts)
if y not in ('on', 'off')
]
if bad_vals:
raise SaltInvocationError(
'The following key/value pairs are invalid: {0}'
.format(', '.join(bad_vals))
)
conf_ptr.update(opts)
_write_options(name, configuration)
new_config = showconfig(name, dict_return=True)
try:
new_config = new_config[next(iter(new_config))]
except (StopIteration, TypeError):
return False
attrs = ['ip_protocol', 'from_port', 'to_port', 'grants']
_rule = odict.OrderedDict()
for attr in attrs:
val = getattr(rule, attr)
if not val:
continue
if attr == 'grants':
_grants = []
for grant in val:
log.debug('examining grant %s for', grant)
g_attrs = {'name': 'source_group_name',
'owner_id': 'source_group_owner_id',
'group_id': 'source_group_group_id',
'cidr_ip': 'cidr_ip'}
_grant = odict.OrderedDict()
for g_attr, g_attr_map in six.iteritems(g_attrs):
g_val = getattr(grant, g_attr)
if not g_val:
continue
_grant[g_attr_map] = g_val
_grants.append(_grant)
_rule['grants'] = _grants
elif attr == 'from_port':
_rule[attr] = int(val)
elif attr == 'to_port':
_rule[attr] = int(val)
else:
_rule[attr] = val
_rules.append(_rule)
return _rules
def _decrypt_object(obj):
"""
Recursively try to find a pass path (string) that can be handed off to pass
"""
if isinstance(obj, six.string_types):
return _fetch_secret(obj)
elif isinstance(obj, dict):
for pass_key, pass_path in six.iteritems(obj):
obj[pass_key] = _decrypt_object(pass_path)
elif isinstance(obj, list):
for pass_key, pass_path in enumerate(obj):
obj[pass_key] = _decrypt_object(pass_path)
return obj
backend_policies = set(b.get('policies', []))
actual_policies_by_backend[b['instance_port']] = backend_policies
to_delete = []
to_create = []
for policy_name in expected_policy_names:
if policy_name not in actual_policy_names:
to_create.append(policy_name)
for policy_name in actual_policy_names:
if policy_name not in expected_policy_names:
if policy_name not in default_aws_policies:
to_delete.append(policy_name)
listeners_to_update = set()
for port, policies in six.iteritems(expected_policies_by_listener):
if policies != actual_policies_by_listener.get(port, set()):
listeners_to_update.add(port)
for port, policies in six.iteritems(actual_policies_by_listener):
if policies != expected_policies_by_listener.get(port, set()):
listeners_to_update.add(port)
backends_to_update = set()
for port, policies in six.iteritems(expected_policies_by_backend):
if policies != actual_policies_by_backend.get(port, set()):
backends_to_update.add(port)
for port, policies in six.iteritems(actual_policies_by_backend):
if policies != expected_policies_by_backend.get(port, set()):
backends_to_update.add(port)
if __opts__['test']:
msg = []
def _yaml_result_unicode_to_utf8(data):
''''
Replace `unicode` strings by utf-8 `str` in final yaml result
This is a recursive function
'''
if isinstance(data, OrderedDict):
for key, elt in six.iteritems(data):
data[key] = _yaml_result_unicode_to_utf8(elt)
elif isinstance(data, list):
for i in range(len(data)):
data[i] = _yaml_result_unicode_to_utf8(data[i])
elif isinstance(data, six.text_type):
# here also
data = data.encode('utf-8')
return data