Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_url_file(self):
'''
cp.get_url with file:// source given
'''
tgt = ''
src = os.path.join('file://', paths.FILES, 'file', 'base', 'file.big')
ret = self.run_function(
'cp.get_url',
[
src,
tgt,
])
with salt.utils.files.fopen(ret, 'r') as scene:
data = salt.utils.stringutils.to_unicode(scene.read())
self.assertIn('KNIGHT: They\'re nervous, sire.', data)
self.assertNotIn('bacon', data)
def test_autosign_grains_accept(self):
grain_file_path = os.path.join(self.autosign_grains_dir, 'test_grain')
with salt.utils.files.fopen(grain_file_path, 'w') as f:
f.write(salt.utils.stringutils.to_str('#invalid_value\ncheese'))
os.chmod(grain_file_path, self.autosign_file_permissions)
self.run_call('test.ping -l quiet') # get minon to try to authenticate itself again
self.assertIn('minion', self.run_key('-l acc'))
'''
Test the (mis-)behaviour of file.replace as described in #18612:
Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
an infinitely growing file as 'file.replace' didn't check beforehand
whether the changes had already been done to the file
# Case description:
The tested file contains one commented line
The commented line should be uncommented in the end, nothing else should change
'''
test_name = 'test_replace_issue_18612'
path_test = os.path.join(TMP, test_name)
with salt.utils.files.fopen(path_test, 'w+') as fp_test_:
fp_test_.write('# en_US.UTF-8')
ret = []
for x in range(0, 3):
ret.append(self.run_state('file.replace',
name=path_test, pattern='^# en_US.UTF-8$', repl='en_US.UTF-8', append_if_not_found=True))
try:
# ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
with salt.utils.files.fopen(path_test, 'r') as fp_test_:
self.assertTrue((sum(1 for _ in fp_test_) == 1))
# ensure, the replacement succeeded
with salt.utils.files.fopen(path_test, 'r') as fp_test_:
self.assertTrue(fp_test_.read().startswith('en_US.UTF-8'))
self.assertTrue(
self.run_function('hosts.add_host', ['192.168.1.2', 'oldhost2'])
)
self.assertTrue(
self.run_function(
'hosts.add_host', ['192.168.1.2', 'host2-reorder']
)
)
self.assertTrue(
self.run_function(
'hosts.add_host', ['192.168.1.1', 'host1-reorder']
)
)
# now read the lines and ensure they're formatted correctly
with salt.utils.files.fopen(HFN, 'r') as fp_:
lines = salt.utils.stringutils.to_unicode(fp_.read()).splitlines()
self.assertEqual(lines, [
'192.168.1.3\t\thost3.fqdn.com',
'192.168.1.1\t\thost1.fqdn.com host1 host1-reorder',
'192.168.1.2\t\thost2.fqdn.com host2 oldhost2 host2-reorder',
])
log.debug('counting strings in dict val: %s', val)
_count_strings(key)
_count_strings(val)
elif isinstance(config, list):
log.debug('counting strings in list: %s', config)
for item in config:
_count_strings(item)
else:
if isinstance(config, six.string_types):
if isinstance(config, six.text_type):
tally['unicode'] = tally.get('unicode', 0) + 1
else:
# We will never reach this on PY3
tally.setdefault('non_unicode', []).append(config)
with salt.utils.files.fopen(fpath, 'w') as wfh:
wfh.write(textwrap.dedent('''
foo: bar
mylist:
- somestring
- 9
- 123.456
- True
- nested:
- key: val
- nestedlist:
- foo
- bar
- baz
mydict:
- somestring: 9
- 123.456: 789
)
)
for subdir_file in SUBDIR_FILES:
cache_loc = os.path.join(fileclient.__opts__['cachedir'],
'files',
saltenv,
SUBDIR,
subdir_file)
# Double check that the content of the cached file
# identifies it as being from the correct saltenv. The
# setUp function creates the file with the name of the
# saltenv mentioned in the file, so a simple 'in' check is
# sufficient here. If opening the file raises an exception,
# this is a problem, so we are not catching the exception
# and letting it be raised so that the test fails.
with salt.utils.files.fopen(cache_loc) as fp_:
content = fp_.read()
log.debug('cache_loc = %s', cache_loc)
log.debug('content = %s', content)
self.assertTrue(subdir_file in content)
self.assertTrue(SUBDIR in content)
self.assertTrue(saltenv in content)
with salt.utils.files.fopen('baz.txt', mode) as self.fh3:
size = 20
result = self.fh3.read(size)
assert result == answers[0][:size], result
# Use .readline() to read the remainder of the line
result = self.fh3.readline()
assert result == answers[0][size:], result
# Read and check the other two lines
result = self.fh3.readline()
assert result == answers[1], result
result = self.fh3.readline()
assert result == answers[2], result
try:
with salt.utils.files.fopen('helloworld.txt'):
raise Exception('No globs should have matched')
except IOError:
# An IOError is expected here
pass
def _get_local_groups(self):
'''
Return all known local groups to the system.
'''
groups = dict()
path = '/etc/group'
with salt.utils.files.fopen(path, 'r') as fp_:
for line in fp_:
line = line.strip()
if ':' not in line:
continue
name, password, gid, users = line.split(':')
groups[name] = {
'gid': gid,
}
if users:
groups[name]['users'] = users.split(',')
return groups
except StopIteration:
raise SaltInvocationError(
'unable to find keygrip associated with fingerprint \'{0}\' for keyid \'{1}\''
.format(local_key_fingerprint, local_keyid)
)
if local_keyid is None:
raise SaltInvocationError(
'The key ID \'{0}\' was not found in GnuPG keyring at \'{1}\''
.format(keyid, gnupghome)
)
_check_repo_sign_utils_support('debsign')
if older_gnupg:
with salt.utils.files.fopen(gpg_info_file, 'r') as fow:
gpg_raw_info = fow.readlines()
for gpg_info_line in gpg_raw_info:
gpg_info_line = salt.utils.stringutils.to_unicode(gpg_info_line)
gpg_info = gpg_info_line.split('=')
env[gpg_info[0]] = gpg_info[1]
break
else:
with salt.utils.files.fopen(gpg_tty_info_file, 'r') as fow:
gpg_raw_info = fow.readlines()
for gpg_tty_info_line in gpg_raw_info:
gpg_tty_info_line = salt.utils.stringutils.to_unicode(gpg_tty_info_line)
gpg_tty_info = gpg_tty_info_line.split('=')
env[gpg_tty_info[0]] = gpg_tty_info[1]
break
ret_val = []
files = []
conf_dirs = ['/etc/xbps.d/', '/usr/share/xbps.d/']
name_glob = '*.conf'
# Matches a line where first printing is "repository" and there is an equals
# sign before the repo, an optional forwardslash at the end of the repo name,
# and it's possible for there to be a comment after repository=repo
regex = re.compile(r'\s*repository\s*=\s*'+repo+r'/?\s*(#.*)?$')
for cur_dir in conf_dirs:
files.extend(glob.glob(cur_dir+name_glob))
for filename in files:
write_buff = []
with salt.utils.files.fopen(filename, 'r') as cur_file:
for line in cur_file:
if regex.match(salt.utils.stringutils.to_unicode(line)):
ret_val.append(filename)
else:
write_buff.append(line)
if rewrite and filename in ret_val:
if len(write_buff) > 0:
with salt.utils.files.fopen(filename, 'w') as rewrite_file:
rewrite_file.writelines(write_buff)
else: # Prune empty files
os.remove(filename)
return ret_val