How to use the sarge.capture_both function in sarge

To help you get started, we’ve selected a few sarge examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github wrdrd / docs / wrdrd / tools / domain.py View on Github external
def check_google_spf(domain):
    """
    Check a Google SPF DNS TXT record

    Args:
        domain (str): DNS domain name

    Returns:
        int: 0 if OK, 1 on error

    | https://support.google.com/a/answer/178723?hl=en
    """
    cmd = sarge.shell_format("dig {0} txt +short", domain)
    log.info('cmd', op='check_google_spf', cmd=cmd)
    proc = sarge.capture_both(cmd)
    output = proc.stdout.text.rstrip().split('\n')
    for line in output:
        log.debug('TXT', record=line)
        expected = u"\"v=spf1 include:_spf.google.com ~all\""
        if line == expected:
            return 0

    errmsg = "%r != %r" % (output, expected)
    log.error('err', msg=errmsg)
    return 1
github wrdrd / docs / wrdrd / tools / domain.py View on Github external
"""
    Check a Google DMARC DNS TXT record

    Args:
        domain (str): DNS domain name

    Returns:
        int: 0 if OK, 1 on error

    | https://support.google.com/a/answer/2466580
    | https://support.google.com/a/answer/2466563
    """
    dmarc_domain = "_dmarc." + domain
    cmd = sarge.shell_format("dig {0} txt +short", dmarc_domain)
    log.info('cmd', op='check_google_dmarc', cmd=cmd)
    proc = sarge.capture_both(cmd)
    output = proc.stdout.text.rstrip().split('\n')
    for line in output:
        log.debug('TXT', record=line)
        expected = u"\"v=DMARC1"  # ... "\; p=none\; rua=mailto:"
        if line.startswith(expected):
            if 'p=' in line:
                return 0

    errmsg = "%r != %r" % (output, expected)
    log.error('err', msg=errmsg)
    return 1
github wrdrd / docs / wrdrd / tools / domain.py View on Github external
| https://support.google.com/a/answer/174126
    | https://admin.google.com/AdminHome?fral=1#AppDetails:service=email&flyout=dkim

    .. note:: This check function only finds "v=DKIM1" TXT records;
       it defaults to the default ``google`` prefix
       and **does not validate DKIM signatures**.

    | http://dkim.org/specs/rfc4871-dkimbase.html#rfc.section.3.6.2.1
    | http://dkim.org/specs/rfc4871-dkimbase.html#rfc.section.A.3

    """
    dkim_record_name = "%s._domainkey.%s" % (prefix, domain)
    cmd = sarge.shell_format("dig {0} txt +short", dkim_record_name)
    log.info('cmd', op='check_google_dkim', cmd=cmd)
    proc = sarge.capture_both(cmd)
    output = proc.stdout.text.rstrip().split('\n')
    for line in output:
        log.debug('TXT', record=line)
        expected = u"\"v=DKIM1"  # ... "\; p=none\; rua=mailto:"
        if line.startswith(expected):
            if 'k=' in line and 'p=' in line:
                return 0

    errmsg = "%s is not a valid DKIM record" % (output)
    log.error('err', msg=errmsg)
    return 1
github jlesquembre / autopilot / src / autopilot / pypi.py View on Github external
def _get_passwd(cmd):
    command = sarge.capture_both(cmd)
    command.wait()
    passwd = command.stdout.text.strip()
    if command.returncode != 0:
        raise FatalError(
            'Command "{}" failed\n'
            'We cannot get a password\n'
            'stdout:\n{}\n'
            'stderr:\n{}\n'
            .format(cmd, passwd, command.stderr.text.strip()))
    return passwd
github bzyx / precious / precious / worker / worker_utils.py View on Github external
def run_command_raw_output(cmd, input=None, async=False, **kwargs):
    timeout = kwargs.pop('timeout', None)
    ts = time.time()
    p = capture_both(cmd, input=input, async=async, **kwargs)
    return {"command": cmd,
            "returncode": p.returncode,
            "stdout": p.stdout.text,
            "stderr": p.stderr.text,
            "tstart": ts,
            "tstop": time.time()}
github wrdrd / docs / wrdrd / tools / domain.py View on Github external
def dig_mx(domain):
    """
    Get MX DNS records with dig

    Args:
        domain (str): DNS domain
    Returns:
        str: dig output

    | https://en.wikipedia.org/wiki/MX_record
    """
    cmd = sarge.shell_format(
        "dig {0} mx +cmd +nocomments +question +noidentify +nostats",
        domain)
    log.info('cmd', cmd=cmd)
    output = sarge.capture_both(cmd)
    return output
github eyal0 / OctoPrint-PrintTimeGenius / octoprint_PrintTimeGenius / __init__.py View on Github external
else:
      logger.info("Not running built-in analysis.")
    for analyzer in self._plugin._settings.get(["analyzers"]):
      command = analyzer["command"].format(gcode=self._current.absolute_path, mcodes=self._plugin.get_printer_config())
      if not analyzer["enabled"]:
        logger.info("Disabled: {}".format(command))
        continue
      logger.info("Running: {}".format(command))
      results_err = ""
      try:
        if parse_version(sarge.__version__) >= parse_version('0.1.5'):
          # Because in version 0.1.5 the name was changed in sarge.
          async_kwarg = 'async_'
        else:
          async_kwarg = 'async'
        sarge_job = sarge.capture_both(command, **{async_kwarg: True})
        # Wait for sarge to begin
        while not sarge_job.processes or not sarge_job.processes[0]:
          time.sleep(0.5)
        try:
          process = psutil.Process(sarge_job.processes[0].pid)
          for p in [process] + process.children(recursive=True):
            try:
              if "IDLE_PRIORITY_CLASS" in dir(psutil):
                p.nice(psutil.IDLE_PRIORITY_CLASS)
              else:
                p.nice(19)
            except psutil.NoSuchProcess:
              pass
        except psutil.NoSuchProcess:
          pass
        while sarge_job.commands[0].poll() is None:
github wrdrd / docs / wrdrd / tools / domain.py View on Github external
def nslookup(domain, nameserver=''):
    """
    Get nslookup information with nslookup (resolve a domainname to an IP)

    Args:
        domain (str): DNS domain
        nameserver (str): DNS domain name server to query (default: ``''``)
    Returns:
        str: nslookup output
    """
    if not domain.endswith('.'):
        domain = domain + '.'
    cmd = sarge.shell_format('nslookup {0} {1}', domain, nameserver)
    log.info('cmd', cmd=cmd)
    output = sarge.capture_both(cmd)
    return output
github wrdrd / docs / wrdrd / tools / domain.py View on Github external
def dig_txt(domain):
    """
    Get DNS TXT records with dig

    Args:
        domain (str): DNS domain
    Returns:
        str: dig output
    """
    cmd = sarge.shell_format(
        "dig {0} txt +cmd +nocomments +question +noidentify +nostats",
        domain)
    log.info('cmd', cmd=cmd)
    output = sarge.capture_both(cmd)
    return output

sarge

A wrapper for subprocess which provides command pipeline functionality.

BSD-2-Clause
Latest version published 3 years ago

Package Health Score

52 / 100
Full package analysis