Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_set_setting_failed(self):
'''
Test if it fails to set a new value for a specific configuration line
'''
with patch('salt.modules.logrotate._parse_conf', MagicMock(return_value=PARSE_CONF)):
kwargs = {'key': 'rotate',
'value': '/var/log/wtmp',
'setting': '2'}
self.assertRaises(SaltInvocationError, logrotate.set_, **kwargs)
'''
ret = {'pkg_|-name=vim_|-name=vim_|-installed': list}
mock = MagicMock(side_effect=["A", None, None, None, None])
with patch.object(state, '_check_queue', mock):
self.assertEqual(state.single("pkg.installed",
" name=vim"), "A")
self.assertEqual(state.single("pk", "name=vim"),
"Invalid function passed")
with patch.dict(state.__opts__, {"test": "install"}):
mock = MagicMock(return_value={"test": ""})
with patch.object(salt.utils.state, 'get_sls_opts', mock):
mock = MagicMock(return_value=True)
with patch.object(salt.utils.args, 'test_mode', mock):
self.assertRaises(SaltInvocationError,
state.single,
"pkg.installed",
"name=vim",
pillar="A")
MockState.State.flag = True
self.assertTrue(state.single("pkg.installed",
"name=vim"))
MockState.State.flag = False
self.assertDictEqual(state.single("pkg.installed",
"name=vim"), ret)
return 1
def incr(self, key, delta):
"""
Mock of incr method
"""
self.key = key
if not isinstance(delta, integer_types):
raise SaltInvocationError('Delta value must be an integer')
return key
with patch.object(memcached, '_connect',
MagicMock(return_value=MockMemcache())):
self.assertEqual(memcached.increment('salt'), 'salt')
self.assertRaises(SaltInvocationError, memcached.increment,
'salt', delta='sa')
return True
# This is the new config line that will be set
if value is True:
new_line = key
elif value:
new_line = '{0} {1}'.format(key, value)
kwargs.update({'prepend_if_not_found': True})
else:
stanza = conf.get(key, dict())
if stanza and not isinstance(stanza, dict):
error_msg = ('Error: A setting for a dict was declared, but the '
'configuration line given is not a dict')
raise SaltInvocationError(error_msg)
if setting == stanza.get(value, False):
_LOG.debug("Command '%s' already has: %s", value, setting)
return True
# We're going to be rewriting an entire stanza
if setting:
stanza[value] = setting
else:
del stanza[value]
new_line = _dict_to_stanza(key, stanza)
kwargs.update({
'pattern': '^{0}.*?{{.*?}}'.format(key),
'flags': 24,
def _get_group(conn=None, name=None, vpc_id=None, vpc_name=None, group_id=None,
region=None, key=None, keyid=None, profile=None): # pylint: disable=W0613
'''
Get a group object given a name, name and vpc_id/vpc_name or group_id. Return
a boto.ec2.securitygroup.SecurityGroup object if the group is found, else
return None.
'''
if vpc_name and vpc_id:
raise SaltInvocationError('The params \'vpc_id\' and \'vpc_name\' '
'are mutually exclusive.')
if vpc_name:
try:
vpc_id = _vpc_name_to_id(vpc_id=vpc_id, vpc_name=vpc_name, region=region,
key=key, keyid=keyid, profile=profile)
except boto.exception.BotoServerError as e:
log.debug(e)
return None
if name:
if vpc_id is None:
log.debug('getting group for %s', name)
group_filter = {'group-name': name}
filtered_groups = conn.get_all_security_groups(filters=group_filter)
# security groups can have the same name if groups exist in both
# EC2-Classic and EC2-VPC
# iterate through groups to ensure we return the EC2-Classic
keyid
Access key to be used.
profile
Dict, or pillar key pointing to a dict, containing AWS region/key/keyid.
CLI Example:
.. code-block:: bash
salt myminion boto3_route53.find_hosted_zone Name=salt.org. \
profile='{"region": "us-east-1", "keyid": "A12345678AB", "key": "xblahblahblah"}'
'''
if not _exactly_one((Id, Name)):
raise SaltInvocationError('Exactly one of either Id or Name is required.')
if PrivateZone is not None and not isinstance(PrivateZone, bool):
raise SaltInvocationError('If set, PrivateZone must be a bool (e.g. True / False).')
if Id:
ret = get_hosted_zone(Id, region=region, key=key, keyid=keyid, profile=profile)
else:
ret = get_hosted_zones_by_domain(Name, region=region, key=key, keyid=keyid, profile=profile)
if PrivateZone is not None:
ret = [m for m in ret if m['HostedZone']['Config']['PrivateZone'] is PrivateZone]
if len(ret) > 1:
log.error(
'Request matched more than one Hosted Zone (%s). Refine your '
'criteria and try again.', [z['HostedZone']['Id'] for z in ret]
)
ret = []
return ret
def _elb_present(name, availability_zones, listeners, subnets, subnet_names,
security_groups, scheme, region, key, keyid, profile):
ret = {'result': True, 'comment': '', 'changes': {}}
if not salt.utils.data.exactly_one((availability_zones, subnets, subnet_names)):
raise SaltInvocationError('Exactly one of availability_zones, subnets, '
'subnet_names must be provided as arguments.')
if not listeners:
listeners = []
for listener in listeners:
if len(listener) < 3:
raise SaltInvocationError('Listeners must have at minimum port,'
' instance_port and protocol values in'
' the provided list.')
if 'elb_port' not in listener:
raise SaltInvocationError('elb_port is a required value for'
' listeners.')
if 'instance_port' not in listener:
raise SaltInvocationError('instance_port is a required value for'
' listeners.')
if 'elb_protocol' not in listener:
raise SaltInvocationError('elb_protocol is a required value for'
' listeners.')
listener['elb_protocol'] = listener['elb_protocol'].upper()
if listener['elb_protocol'] == 'HTTPS' and 'certificate' not in listener:
raise SaltInvocationError('certificate is a required value for'
' listeners if HTTPS is set for'
' elb_protocol.')
def _get_build_env(env):
'''
Get build environment overrides dictionary to use in build process
'''
env_override = ''
if env is None:
return env_override
if not isinstance(env, dict):
raise SaltInvocationError(
'\'env\' must be a Python dictionary'
)
for key, value in env.items():
env_override += '{0}={1}\n'.format(key, value)
env_override += 'export {0}\n'.format(key)
return env_override
def _delete_resource(resource, name=None, resource_id=None, region=None,
key=None, keyid=None, profile=None, **kwargs):
'''
Delete a VPC resource. Returns True if successful, otherwise False.
'''
if not _exactly_one((name, resource_id)):
raise SaltInvocationError('One (but not both) of name or id must be '
'provided.')
try:
conn = _get_conn(region=region, key=key, keyid=keyid, profile=profile)
try:
delete_resource = getattr(conn, 'delete_' + resource)
except AttributeError:
raise AttributeError('{0} function does not exist for boto VPC '
'connection.'.format('delete_' + resource))
if name:
resource_id = _get_resource_id(resource, name,
region=region, key=key,
keyid=keyid, profile=profile)
if not resource_id:
return {'deleted': False, 'error': {'message':