Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_google_spf(domain):
"""
Check a Google SPF DNS TXT record
Args:
domain (str): DNS domain name
Returns:
int: 0 if OK, 1 on error
| https://support.google.com/a/answer/178723?hl=en
"""
cmd = sarge.shell_format("dig {0} txt +short", domain)
log.info('cmd', op='check_google_spf', cmd=cmd)
proc = sarge.capture_both(cmd)
output = proc.stdout.text.rstrip().split('\n')
for line in output:
log.debug('TXT', record=line)
expected = u"\"v=spf1 include:_spf.google.com ~all\""
if line == expected:
return 0
errmsg = "%r != %r" % (output, expected)
log.error('err', msg=errmsg)
return 1
"""
Check a Google DMARC DNS TXT record
Args:
domain (str): DNS domain name
Returns:
int: 0 if OK, 1 on error
| https://support.google.com/a/answer/2466580
| https://support.google.com/a/answer/2466563
"""
dmarc_domain = "_dmarc." + domain
cmd = sarge.shell_format("dig {0} txt +short", dmarc_domain)
log.info('cmd', op='check_google_dmarc', cmd=cmd)
proc = sarge.capture_both(cmd)
output = proc.stdout.text.rstrip().split('\n')
for line in output:
log.debug('TXT', record=line)
expected = u"\"v=DMARC1" # ... "\; p=none\; rua=mailto:"
if line.startswith(expected):
if 'p=' in line:
return 0
errmsg = "%r != %r" % (output, expected)
log.error('err', msg=errmsg)
return 1
| https://support.google.com/a/answer/174126
| https://admin.google.com/AdminHome?fral=1#AppDetails:service=email&flyout=dkim
.. note:: This check function only finds "v=DKIM1" TXT records;
it defaults to the default ``google`` prefix
and **does not validate DKIM signatures**.
| http://dkim.org/specs/rfc4871-dkimbase.html#rfc.section.3.6.2.1
| http://dkim.org/specs/rfc4871-dkimbase.html#rfc.section.A.3
"""
dkim_record_name = "%s._domainkey.%s" % (prefix, domain)
cmd = sarge.shell_format("dig {0} txt +short", dkim_record_name)
log.info('cmd', op='check_google_dkim', cmd=cmd)
proc = sarge.capture_both(cmd)
output = proc.stdout.text.rstrip().split('\n')
for line in output:
log.debug('TXT', record=line)
expected = u"\"v=DKIM1" # ... "\; p=none\; rua=mailto:"
if line.startswith(expected):
if 'k=' in line and 'p=' in line:
return 0
errmsg = "%s is not a valid DKIM record" % (output)
log.error('err', msg=errmsg)
return 1
def _get_passwd(cmd):
command = sarge.capture_both(cmd)
command.wait()
passwd = command.stdout.text.strip()
if command.returncode != 0:
raise FatalError(
'Command "{}" failed\n'
'We cannot get a password\n'
'stdout:\n{}\n'
'stderr:\n{}\n'
.format(cmd, passwd, command.stderr.text.strip()))
return passwd
def run_command_raw_output(cmd, input=None, async=False, **kwargs):
timeout = kwargs.pop('timeout', None)
ts = time.time()
p = capture_both(cmd, input=input, async=async, **kwargs)
return {"command": cmd,
"returncode": p.returncode,
"stdout": p.stdout.text,
"stderr": p.stderr.text,
"tstart": ts,
"tstop": time.time()}
def dig_mx(domain):
"""
Get MX DNS records with dig
Args:
domain (str): DNS domain
Returns:
str: dig output
| https://en.wikipedia.org/wiki/MX_record
"""
cmd = sarge.shell_format(
"dig {0} mx +cmd +nocomments +question +noidentify +nostats",
domain)
log.info('cmd', cmd=cmd)
output = sarge.capture_both(cmd)
return output
else:
logger.info("Not running built-in analysis.")
for analyzer in self._plugin._settings.get(["analyzers"]):
command = analyzer["command"].format(gcode=self._current.absolute_path, mcodes=self._plugin.get_printer_config())
if not analyzer["enabled"]:
logger.info("Disabled: {}".format(command))
continue
logger.info("Running: {}".format(command))
results_err = ""
try:
if parse_version(sarge.__version__) >= parse_version('0.1.5'):
# Because in version 0.1.5 the name was changed in sarge.
async_kwarg = 'async_'
else:
async_kwarg = 'async'
sarge_job = sarge.capture_both(command, **{async_kwarg: True})
# Wait for sarge to begin
while not sarge_job.processes or not sarge_job.processes[0]:
time.sleep(0.5)
try:
process = psutil.Process(sarge_job.processes[0].pid)
for p in [process] + process.children(recursive=True):
try:
if "IDLE_PRIORITY_CLASS" in dir(psutil):
p.nice(psutil.IDLE_PRIORITY_CLASS)
else:
p.nice(19)
except psutil.NoSuchProcess:
pass
except psutil.NoSuchProcess:
pass
while sarge_job.commands[0].poll() is None:
def nslookup(domain, nameserver=''):
"""
Get nslookup information with nslookup (resolve a domainname to an IP)
Args:
domain (str): DNS domain
nameserver (str): DNS domain name server to query (default: ``''``)
Returns:
str: nslookup output
"""
if not domain.endswith('.'):
domain = domain + '.'
cmd = sarge.shell_format('nslookup {0} {1}', domain, nameserver)
log.info('cmd', cmd=cmd)
output = sarge.capture_both(cmd)
return output
def dig_txt(domain):
"""
Get DNS TXT records with dig
Args:
domain (str): DNS domain
Returns:
str: dig output
"""
cmd = sarge.shell_format(
"dig {0} txt +cmd +nocomments +question +noidentify +nostats",
domain)
log.info('cmd', cmd=cmd)
output = sarge.capture_both(cmd)
return output