Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# libvirt wants the memory in MB
memory *= 1024
vm_name = '{}-{}'.format(rhci.deployment_id, vm_suffix)
# The following is mostly copied out of the deploy from iso script,
# and should probably be generalized in rhci_common (or something)
print 'Provisioning RHCI VM {} via libvirt'.format(vm_name)
shell_args = {
'vm_name': vm_name,
'memory': memory,
'cpus': cpu_count,
'disk_size': disk_size,
'private_net_config': 'bridge:br1',
'vnc_password': vnc_password
}
cmd = sarge.shell_format('virt-install'
' -n {vm_name}'
' --os-variant=rhel7'
' --ram {memory}'
' --pxe'
' --vcpus {cpus}'
' --disk bus="virtio,size={disk_size}"'
' --network {private_net_config}'
' --boot net,hd'
' --graphics "vnc,listen=0.0.0.0,password={vnc_password}"'
' --noautoconsole',
**shell_args)
proc = sarge.capture_both(cmd)
if proc.returncode != 0:
print >>sys.stderr, 'virt-install failed'
print >>sys.stderr, proc.stderr.read()
# virt-install the ISO we just downloaded
print 'Provisioning RHCI VM {} via libvirt'.format(vm_name)
shell_args = {
'vm_name': vm_name,
'memory': 12288,
'cpus': 4,
'libvirt_pool': libvirt_storage_pool,
'disk_size': 100,
'public_net_config': public_net_config,
'private_net_config': private_net_config,
'image_path': image_path,
'vnc_password': vnc_password
}
cmd = sarge.shell_format('virt-install'
' -n {vm_name}'
' --os-variant=rhel7'
' --ram {memory}'
' --vcpus {cpus}'
' --disk bus="virtio,pool={libvirt_pool},size={disk_size}"'
' --cdrom {image_path}'
' --network {public_net_config}'
' --network {private_net_config}'
' --boot "hd,cdrom"'
' --graphics "vnc,listen=0.0.0.0,password={vnc_password}"'
' --noautoconsole',
**shell_args)
# this will block while virt-install runs
proc = sarge.capture_both(cmd)
if proc.returncode != 0:
def check_google_mx(domain):
"""
Check Google MX DNS records
Args:
domain (str): DNS domain name
Returns:
int: 0 if OK, 1 on error
| https://support.google.com/a/topic/2716885?hl=en&ref_topic=2426592
"""
cmd = sarge.shell_format("dig {0} mx +short", domain)
log.info('cmd', cmd=cmd)
output = sarge.capture_both(cmd).stdout.text.rstrip()
log.debug('MX', record=output)
result = 0
check_domain1 = "aspmx.l.google.com."
check_domain2 = "googlemail.com."
lines = output.split('\n')
if not lines:
log.error('err', msg="No MX records found for %r" % domain)
result += 1
for l in lines:
l = l.lower()
if not (l.endswith(check_domain1) or l.endswith(check_domain2)):
result += 1
log.error('err', msg="%r does not end with %r or %r" %
(l, check_domain1, check_domain2))
def dig_mx(domain):
"""
Get MX DNS records with dig
Args:
domain (str): DNS domain
Returns:
str: dig output
| https://en.wikipedia.org/wiki/MX_record
"""
cmd = sarge.shell_format(
"dig {0} mx +cmd +nocomments +question +noidentify +nostats",
domain)
log.info('cmd', cmd=cmd)
output = sarge.capture_both(cmd)
return output
def exposed_mkdir(self, dir_):
"""
classic 'mkdir'
"""
logger.debug("Creating directory {0}".format(dir_))
formated = shell_format('mkdir {0}', dir_)
logger.debug("Creating formated {0}".format(formated))
return run_command_forget_output(formated, cwd=self.cwd)
def start_python_process(component_name, run_in_fg, py_path, program_dir, on_keyboard_interrupt=None, failed_to_start_err=-100,
extra_options=None, stderr_path=None, stdin_data=None):
""" Starts a new process from a given Python path, either in background or foreground (run_in_fg).
"""
options = {
'fg': run_in_fg,
}
if extra_options:
options.update(extra_options)
options = CLI_ARG_SEP.join('{}={}'.format(k, v) for k, v in options.items())
py_path_option = shell_format('-m {0}', py_path)
program_dir_option = shell_format('{0}', program_dir) if program_dir else ''
extra_cli_options = '{} {} {}'.format(py_path_option, program_dir_option, options)
extra_cli_options = '{} '.format(py_path_option)
if program_dir_option:
extra_cli_options += '{} '.format(program_dir_option)
extra_cli_options += '{}'.format(options)
return start_process(component_name, get_executable(), run_in_fg, None, extra_cli_options, on_keyboard_interrupt,
failed_to_start_err, extra_options, stderr_path, stdin_data)
def set_default_org(self, name):
""" set the default org for tasks by name key """
org = self.get_org(name)
self.unset_default_org()
org.config["default"] = True
self.set_org(org)
if org.created:
sfdx(
sarge.shell_format(
"force:config:set defaultusername={}", org.sfdx_alias
)
def dig_dnskey(zone):
"""
Get DNSSEC DNS records with dig
Args:
zone (str): DNS zone
Returns:
str: dig output
"""
cmd = sarge.shell_format(
"dig {0} +dnssec dnskey +cmd +nocomments +question +noidentify +nostats",
zone)
log.info('cmd', cmd=cmd)
output = sarge.capture_both(cmd)
return output
def whois(domain):
"""
Get whois information with whois
Args:
domain (str): DNS domain
Returns:
str: whois output
"""
cmd = sarge.shell_format('whois {0}', domain)
log.info('cmd', cmd=cmd)
output = sarge.capture_both(cmd)
return output
def get_git_config(config_key):
p = sarge.Command(
sarge.shell_format('git config --get "{0!s}"', config_key),
stderr=sarge.Capture(buffer_size=-1),
stdout=sarge.Capture(buffer_size=-1),
shell=True,
)
p.run()
config_value = (
io.TextIOWrapper(p.stdout, encoding=sys.stdout.encoding).read().strip()
)
return config_value if config_value and not p.returncode else None