Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import salt.utils
from salt.modules import mysql as mysqlmod
# Import 3rd-party libs
import salt.ext.six as six
from salt.ext.six.moves import range # pylint: disable=import-error,redefined-builtin
log = logging.getLogger(__name__)
NO_MYSQL = False
try:
import MySQLdb # pylint: disable=import-error,unused-import
except Exception:
NO_MYSQL = True
if not salt.utils.which('mysqladmin'):
NO_MYSQL = True
@skipIf(
NO_MYSQL,
'Please install MySQL bindings and a MySQL Server before running'
'MySQL returner archiver integration tests.'
)
@destructiveTest
class MysqlReturnerArchiverTest(integration.ShellCase,
integration.SaltReturnAssertsMixIn):
'''
Module testing the MySQL returner archiver
'''
def _dmidecode_data(regex_dict):
'''
Parse the output of dmidecode in a generic fashion that can
be used for the multiple system types which have dmidecode.
'''
ret = {}
# No use running if dmidecode/smbios isn't in the path
if salt.utils.which('dmidecode'):
out = __salt__['cmd.run']('dmidecode')
elif salt.utils.which('smbios'):
out = __salt__['cmd.run']('smbios')
else:
return ret
for section in regex_dict:
section_found = False
# Look at every line for the right section
for line in out.splitlines():
if not line:
continue
# We've found it, woohoo!
if re.match(section, line):
section_found = True
continue
if not section_found:
def __virtual__():
'''
mdadm provides raid functions for Linux
'''
if __grains__['kernel'] != 'Linux':
return False
if not salt.utils.which('mdadm'):
return False
return 'raid'
example4:
at.absent:
- limit: all
- tag: rose
- day: 13
- hour: 16
'''
if 'limit' in kwargs:
name = kwargs['limit']
ret = {'name': name,
'changes': {},
'result': True,
'comment': ''}
binary = salt.utils.which('at')
if __opts__['test']:
ret['result'] = None
ret['comment'] = 'Remove jobs()'
return ret
if name != 'all':
ret['comment'] = 'limit parameter not supported {0}'.format(name)
ret['result'] = False
return ret
#if jobid:
# output = __salt__['cmd.run']('{0} -d {1}'.format(binary, jobid))
# if i in map(str, [j['job'] for j in __salt__['at.atq']()['jobs']]):
# ret['result'] = False
# return ret
def __virtual__():
'''
Confirm this module is on a Debian based system and that debconf-utils
is installed.
'''
if __grains__['os_family'] != 'Debian':
return False
if salt.utils.which('debconf-get-selections') is None:
return False
return 'debconf'
def _cmd_str(self, cmd, ssh='ssh'):
'''
Return the cmd string to execute
'''
# TODO: if tty, then our SSH_SHIM cannot be supplied from STDIN Will
# need to deliver the SHIM to the remote host and execute it there
if self.passwd and salt.utils.which('sshpass'):
opts = self._passwd_opts()
return "sshpass -p '{0}' {1} {2} {3} {4} {5}".format(
self.passwd,
ssh,
'' if ssh == 'scp' else self.host,
'-t -t' if self.tty else '',
opts,
cmd)
if self.priv:
opts = self._key_opts()
return "{0} {1} {2} {3} {4}".format(
ssh,
'' if ssh == 'scp' else self.host,
'-t -t' if self.tty else '',
opts,
cmd)
def __virtual__():
'''
Only load the module if varnish is installed
'''
if salt.utils.which('varnishd') and salt.utils.which('varnishadm'):
return __virtualname__
return (False, 'The varnish execution module failed to load: either varnishd or varnishadm is not in the path.')
def __virtual__():
'''
Only work when Bower is installed
'''
if salt.utils.which('bower') is None:
return (False, 'The bower module could not be loaded: bower command not found')
return True
def ext_nodes(self):
'''
Return the metadata derived from the external nodes system on the local
system
'''
if not self.opts['external_nodes']:
return {}
if not salt.utils.which(self.opts['external_nodes']):
log.error(('Specified external nodes controller {0} is not'
' available, please verify that it is installed'
'').format(self.opts['external_nodes']))
return {}
cmd = '{0} {1}'.format(self.opts['external_nodes'], self.opts['id'])
ndata = yaml.safe_load(subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE
).communicate()[0])
ret = {}
if 'environment' in ndata:
env = ndata['environment']
else:
env = 'base'
# Tested on Fedora 15 / 2.6.41.4-1 without running xen
elif isdir('/sys/bus/xen'):
if 'xen' in __salt__['cmd.run']('dmesg').lower():
grains['virtual_subtype'] = 'Xen PV DomU'
elif os.listdir('/sys/bus/xen/drivers'):
# An actual DomU will have several drivers
# whereas a paravirt ops kernel will not.
grains['virtual_subtype'] = 'Xen PV DomU'
# If a Dom0 or DomU was detected, obviously this is xen
if 'dom' in grains.get('virtual_subtype', '').lower():
grains['virtual'] = 'xen'
if os.path.isfile('/proc/cpuinfo'):
if 'QEMU Virtual CPU' in salt.utils.fopen('/proc/cpuinfo', 'r').read():
grains['virtual'] = 'kvm'
elif osdata['kernel'] == 'FreeBSD':
kenv = salt.utils.which('kenv')
if kenv:
product = __salt__['cmd.run']('{0} smbios.system.product'.format(kenv))
maker = __salt__['cmd.run']('{0} smbios.system.maker'.format(kenv))
if product.startswith('VMware'):
grains['virtual'] = 'VMware'
if maker.startswith('Xen'):
grains['virtual_subtype'] = '{0} {1}'.format(maker, product)
grains['virtual'] = 'xen'
if sysctl:
model = __salt__['cmd.run']('{0} hw.model'.format(sysctl))
jail = __salt__['cmd.run']('{0} -n security.jail.jailed'.format(sysctl))
if jail == '1':
grains['virtual_subtype'] = 'jail'
if 'QEMU Virtual CPU' in model:
grains['virtual'] = 'kvm'
elif osdata['kernel'] == 'SunOS':