Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.tmpdir = salt.utils.stringutils.to_unicode(tempfile.mkdtemp())
default_only : boolean
return only default profile
CLI Example:
.. code-block:: bash
salt '*' rbac.profile_list
'''
profiles = {}
default_profiles = ['All']
## lookup default profile(s)
with salt.utils.files.fopen('/etc/security/policy.conf', 'r') as policy_conf:
for policy in policy_conf:
policy = salt.utils.stringutils.to_unicode(policy)
policy = policy.split('=')
if policy[0].strip() == 'PROFS_GRANTED':
default_profiles.extend(policy[1].strip().split(','))
## read prof_attr file (profname:res1:res2:desc:attr)
with salt.utils.files.fopen('/etc/security/prof_attr', 'r') as prof_attr:
for profile in prof_attr:
profile = salt.utils.stringutils.to_unicode(profile)
profile = profile.split(':')
# skip comments and non complaint lines
if len(profile) != 5:
continue
# add profile info to dict
profiles[profile[0]] = profile[3]
def _mod_repo_in_file(repo, repostr, filepath):
'''
Replace a repo entry in filepath with repostr
'''
with salt.utils.files.fopen(filepath) as fhandle:
output = []
for line in fhandle:
cols = salt.utils.args.shlex_split(
salt.utils.stringutils.to_unicode(line).strip()
)
if repo not in cols:
output.append(line)
else:
output.append(salt.utils.stringutils.to_str(repostr + '\n'))
with salt.utils.files.fopen(filepath, 'w') as fhandle:
fhandle.writelines(output)
def _unify_keywords():
'''
Merge /etc/portage/package.keywords and
/etc/portage/package.accept_keywords.
'''
old_path = BASE_PATH.format('keywords')
if os.path.exists(old_path):
if os.path.isdir(old_path):
for triplet in salt.utils.path.os_walk(old_path):
for file_name in triplet[2]:
file_path = '{0}/{1}'.format(triplet[0], file_name)
with salt.utils.files.fopen(file_path) as fh_:
for line in fh_:
line = salt.utils.stringutils.to_unicode(line).strip()
if line and not line.startswith('#'):
append_to_package_conf(
'accept_keywords', string=line)
shutil.rmtree(old_path)
else:
with salt.utils.files.fopen(old_path) as fh_:
for line in fh_:
line = salt.utils.stringutils.to_unicode(line).strip()
if line and not line.startswith('#'):
append_to_package_conf('accept_keywords', string=line)
os.remove(old_path)
grains['virtual_subtype'] = 'LXC'
else:
if any(x in fhr_contents
for x in (':/system.slice/docker', ':/docker/',
':/docker-ce/')):
grains['virtual_subtype'] = 'Docker'
except IOError:
pass
if os.path.isfile('/proc/cpuinfo'):
with salt.utils.files.fopen('/proc/cpuinfo', 'r') as fhr:
if 'QEMU Virtual CPU' in fhr.read():
grains['virtual'] = 'kvm'
if os.path.isfile('/sys/devices/virtual/dmi/id/product_name'):
try:
with salt.utils.files.fopen('/sys/devices/virtual/dmi/id/product_name', 'r') as fhr:
output = salt.utils.stringutils.to_unicode(fhr.read(), errors='replace')
if 'VirtualBox' in output:
grains['virtual'] = 'VirtualBox'
elif 'RHEV Hypervisor' in output:
grains['virtual'] = 'kvm'
grains['virtual_subtype'] = 'rhev'
elif 'oVirt Node' in output:
grains['virtual'] = 'kvm'
grains['virtual_subtype'] = 'ovirt'
elif 'Google' in output:
grains['virtual'] = 'gce'
elif 'BHYVE' in output:
grains['virtual'] = 'bhyve'
except IOError:
pass
elif osdata['kernel'] == 'FreeBSD':
kenv = salt.utils.path.which('kenv')
return ret
else:
fp_.seek(loc)
while True:
now = int(time.time())
raw = fp_.read(SIZE)
if len(raw) != SIZE:
return ret
__context__[LOC_KEY] = fp_.tell()
pack = struct.unpack(FMT, raw)
event = {}
for ind, field in enumerate(FIELDS):
event[field] = pack[ind]
if isinstance(event[field], salt.ext.six.string_types):
if isinstance(event[field], bytes):
event[field] = salt.utils.stringutils.to_unicode(event[field])
event[field] = event[field].strip('\x00')
if users:
if event['user'] in users:
_user = users[event['user']]
if isinstance(_user, dict) and 'time_range' in _user:
if _check_time_range(_user['time_range'], now):
ret.append(event)
else:
if defaults and 'time_range' in defaults:
if _check_time_range(defaults['time_range'],
now):
ret.append(event)
else:
ret.append(event)
else:
if web_server_type not in web_server_types:
salt.utils.namecheap.log.error('Invalid option for web_server_type=' + web_server_type)
raise Exception('Invalid option for web_server_type=' + web_server_type)
if approver_email is not None and http_dc_validation:
salt.utils.namecheap.log.error('approver_email and http_dc_validation cannot both have values')
raise Exception('approver_email and http_dc_validation cannot both have values')
if approver_email is None and not http_dc_validation:
salt.utils.namecheap.log.error('approver_email or http_dc_validation must have a value')
raise Exception('approver_email or http_dc_validation must have a value')
opts = salt.utils.namecheap.get_opts(command)
with salt.utils.files.fopen(csr_file, 'rb') as csr_handle:
opts['csr'] = salt.utils.stringutils.to_unicode(
csr_handle.read()
)
opts['CertificateID'] = certificate_id
opts['WebServerType'] = web_server_type
if approver_email is not None:
opts['ApproverEmail'] = approver_email
if http_dc_validation:
opts['HTTPDCValidation'] = 'True'
for key, value in six.iteritems(kwargs):
opts[key] = value
response_xml = salt.utils.namecheap.post_request(opts)
source_sum=source_sum,
user=None,
group=None,
mode=None,
saltenv='base',
backup=None,
makedirs=True,
template=None,
show_changes=False,
contents=None,
dir_mode=None)
if (template_manage_result['result']) or \
((__opts__['test']) and (template_manage_result['result'] is not False)):
with salt.utils.files.fopen(template_tmp_file, 'r') as tfp_:
tpl = salt.utils.stringutils.to_unicode(tfp_.read())
salt.utils.files.safe_rm(template_tmp_file)
try:
template_parse = _parse_template(tpl)
if 'heat_template_version' in template_parse:
template_new = salt.utils.yaml.safe_dump(template_parse)
else:
template_new = jsonutils.dumps(template_parse, indent=2, ensure_ascii=False)
salt.utils.files.safe_rm(template_tmp_file)
except ValueError as ex:
ret['result'] = False
ret['comment'] = 'Error parsing template {0}'.format(ex)
else:
ret['result'] = False
ret['comment'] = 'Can not open template: {0} {1}'.format(template, comment_)
else:
ret['result'] = False
def _init_auth(self):
if self.auth:
return
low = {}
skip_perm_errors = self.opts['eauth'] != ''
if self.opts['eauth']:
if 'token' in self.opts:
try:
with salt.utils.files.fopen(os.path.join(self.opts['cachedir'], '.root_key'), 'r') as fp_:
low['key'] = \
salt.utils.stringutils.to_unicode(fp_.readline())
except IOError:
low['token'] = self.opts['token']
#
# If using eauth and a token hasn't already been loaded into
# low, prompt the user to enter auth credentials
if 'token' not in low and 'key' not in low and self.opts['eauth']:
# This is expensive. Don't do it unless we need to.
resolver = salt.auth.Resolver(self.opts)
res = resolver.cli(self.opts['eauth'])
if self.opts['mktoken'] and res:
tok = resolver.token_cli(
self.opts['eauth'],
res
)
if tok:
low['token'] = tok.get('token', '')
def _normalize_dir(string_):
'''
Normalize the directory to make comparison possible
'''
return re.sub(r'\\$', '', salt.utils.stringutils.to_unicode(string_))