Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pkg_cap_003_installed_multipkg_with_version(self):
'''
This is a destructive test as it installs and then removes two packages
'''
if pre_grains and ('arch' in pre_grains.like() or 'archlinux' in pre_grains.like()):
for idx in range(13):
if idx == 12:
raise Exception('Package database locked after 60 seconds, '
'bailing out')
if not os.path.isfile('/var/lib/pacman/db.lck'):
break
time.sleep(5)
target, realpkg = _PKG_CAP_TARGETS[0]
version = self.latest_version(target)
realver = self.latest_version(realpkg)
# If this assert fails, we need to find new targets, this test needs to
# be able to test successful installation of packages, so these
# packages need to not be installed before we run the states below
self.assertTrue(version, 'new pkg cap targets required')
self.assertTrue(realver, 'new pkg cap targets required')
def _incr(counter, num):
salt.utils.process.appendproctitle('test_{0}'.format(name))
for _ in range(0, num):
counter.value += 1
setattr(self, 'incr_' + name, _incr)
def _random_name(prefix=''):
ret = prefix
for _ in range(8):
ret += random.choice(string.ascii_lowercase)
return ret
def test_is_bin_str(self):
self.assertFalse(utils.is_bin_str(LORUM_IPSUM))
zero_str = '{0}{1}'.format(LORUM_IPSUM, '\0')
self.assertTrue(utils.is_bin_str(zero_str))
# To to ensure safe exit if str passed doesn't evaluate to True
self.assertFalse(utils.is_bin_str(''))
nontext = 3 * (''.join([chr(x) for x in range(1, 32) if x not in (8, 9, 10, 12, 13)]))
almost_bin_str = '{0}{1}'.format(LORUM_IPSUM[:100], nontext[:42])
self.assertFalse(utils.is_bin_str(almost_bin_str))
bin_str = almost_bin_str + '\x01'
self.assertTrue(utils.is_bin_str(bin_str))
def __random_name(size=6):
'''
Generates a random cloud instance name
'''
return 'CLOUD-TEST-' + ''.join(
random.choice(string.ascii_uppercase + string.digits)
for x in range(size)
)
def _sprinkle(config_str):
'''
Sprinkle with grains of salt, that is
convert 'test {id} test {host} ' types of strings
'''
parts = [x for sub in config_str.split('{') for x in sub.split('}')]
for i in range(1, len(parts), 2):
parts[i] = str(__grains__.get(parts[i], ''))
return ''.join(parts)
def ulimits(val, **kwargs): # pylint: disable=unused-argument
val = helpers.translate_stringlist(val)
for idx in range(len(val)):
if not isinstance(val[idx], dict):
try:
ulimit_name, limits = \
helpers.split(val[idx], '=', 1)
comps = helpers.split(limits, ':', 1)
except (AttributeError, ValueError):
raise SaltInvocationError(
'Ulimit definition \'{0}\' is not in the format '
'type=soft_limit[:hard_limit]'.format(val[idx])
)
if len(comps) == 1:
comps *= 2
soft_limit, hard_limit = comps
try:
val[idx] = {'Name': ulimit_name,
'Soft': int(soft_limit),
def combine_comments(comments):
'''
Given a list of comments, or a comment submitted as a string, return a
single line of text containing all of the comments.
'''
if isinstance(comments, list):
for idx in range(len(comments)):
if not isinstance(comments[idx], six.string_types):
comments[idx] = str(comments[idx])
else:
if not isinstance(comments, six.string_types):
comments = [str(comments)]
else:
comments = [comments]
return ' '.join(comments).strip()
base_information['ipv4']['netmask'] = settings['Subnet_Mask']
base_information['ipv4']['gateway'] = settings['Gateway']
base_information['ipv4']['dns'] = [settings['DNS_Address']]
elif base_information['up']:
base_information['ipv4']['address'] = interface.sockaddrToStr(interface.addr)
base_information['ipv4']['netmask'] = interface.sockaddrToStr(interface.netmask)
base_information['ipv4']['gateway'] = '0.0.0.0'
base_information['ipv4']['dns'] = _get_dns_info()
with salt.utils.files.fopen('/proc/net/route', 'r') as route_file:
pattern = re.compile(r'^{interface}\t[0]{{8}}\t([0-9A-Z]{{8}})'.format(interface=interface.name),
re.MULTILINE)
match = pattern.search(route_file.read())
iface_gateway_hex = None if not match else match.group(1)
if iface_gateway_hex is not None and len(iface_gateway_hex) == 8:
base_information['ipv4']['gateway'] = '.'.join([str(int(iface_gateway_hex[i:i + 2], 16))
for i in range(6, -1, -2)])
return base_information
Any kwargs required by the above function, with their keywords
i.e. StreamName=stream_name
Returns:
The result dict with the HTTP response and JSON data if applicable
as 'result', or an error as 'error'
CLI example::
salt myminion boto_kinesis._execute_with_retries existing_conn function_name function_kwargs
'''
r = {}
max_attempts = 18
max_retry_delay = 10
for attempt in range(max_attempts):
log.info("attempt: {0} function: {1}".format(attempt, function))
try:
fn = getattr(conn, function)
r['result'] = fn(**kwargs)
return r
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if "LimitExceededException" in error_code or "ResourceInUseException" in error_code:
# could be rate limited by AWS or another command is blocking,
# retry with exponential backoff
log.debug("Retrying due to AWS exception {0}".format(e))
time.sleep(_jittered_backoff(attempt, max_retry_delay))
else:
# ResourceNotFoundException or InvalidArgumentException
r['error'] = e.response['Error']
r['result'] = None