How to use the pycares.QUERY_TYPE_SOA function in pycares

To help you get started, we’ve selected a few pycares examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github saghul / pycares / tests / tests.py View on Github external
def test_result_not_ascii(self):
        self.result, self.errorno = None, None
        def cb(result, errorno):
            self.result, self.errorno = result, errorno
        self.channel.query('ayesas.com', pycares.QUERY_TYPE_SOA, cb)
        self.wait()
        self.assertNoError(self.errorno)
        self.assertEqual(type(self.result), pycares.ares_query_soa_result)
        self.assertIsInstance(self.result.hostmaster, bytes)  # it's not ASCII
        self.assertTrue(self.result.ttl >= 0)
github saghul / pycares / examples / cares-resolver.py View on Github external
def gethostbyname(self, name, cb):
        self._channel.gethostbyname(name, socket.AF_INET, cb)


if __name__ == '__main__':
    def query_cb(result, error):
        print(result)
        print(error)
    def gethostbyname_cb(result, error):
        print(result)
        print(error)
    loop = pyuv.Loop.default_loop()
    resolver = DNSResolver(loop)
    resolver.query('google.com', pycares.QUERY_TYPE_A, query_cb)
    resolver.query('sip2sip.info', pycares.QUERY_TYPE_SOA, query_cb)
    resolver.gethostbyname('apple.com', gethostbyname_cb)
    loop.run()
github saghul / pycares / examples / cares-selectors.py View on Github external
def gethostbyname(self, name, cb):
        self._channel.gethostbyname(name, socket.AF_INET, cb)


if __name__ == '__main__':
    def query_cb(result, error):
        print(result)
        print(error)
    def gethostbyname_cb(result, error):
        print(result)
        print(error)
    resolver = DNSResolver()
    resolver.query('google.com', pycares.QUERY_TYPE_A, query_cb)
    resolver.query('google.com', pycares.QUERY_TYPE_AAAA, query_cb)
    resolver.query('facebook.com', pycares.QUERY_TYPE_A, query_cb)
    resolver.query('sip2sip.info', pycares.QUERY_TYPE_SOA, query_cb)
    resolver.gethostbyname('apple.com', gethostbyname_cb)
    resolver.wait_channel()
github saghul / pycares / examples / cares-select.py View on Github external
continue
        rlist, wlist, xlist = select.select(read_fds, write_fds, [], timeout)
        for fd in rlist:
            channel.process_fd(fd, pycares.ARES_SOCKET_BAD)
        for fd in wlist:
            channel.process_fd(pycares.ARES_SOCKET_BAD, fd)


if __name__ == '__main__':
    def cb(result, error):
        print(result)
        print(error)
    channel = pycares.Channel()
    channel.gethostbyname('google.com', socket.AF_INET, cb)
    channel.query('google.com', pycares.QUERY_TYPE_A, cb)
    channel.query('sip2sip.info', pycares.QUERY_TYPE_SOA, cb)
    wait_channel(channel)
    print('Done!')
github saghul / aiodns / aiodns / __init__.py View on Github external
__all__ = ('DNSResolver', 'error')


READ = 1
WRITE = 2

query_type_map = {'A'     : pycares.QUERY_TYPE_A,
                  'AAAA'  : pycares.QUERY_TYPE_AAAA,
                  'ANY'   : pycares.QUERY_TYPE_ANY,
                  'CNAME' : pycares.QUERY_TYPE_CNAME,
                  'MX'    : pycares.QUERY_TYPE_MX,
                  'NAPTR' : pycares.QUERY_TYPE_NAPTR,
                  'NS'    : pycares.QUERY_TYPE_NS,
                  'PTR'   : pycares.QUERY_TYPE_PTR,
                  'SOA'   : pycares.QUERY_TYPE_SOA,
                  'SRV'   : pycares.QUERY_TYPE_SRV,
                  'TXT'   : pycares.QUERY_TYPE_TXT
        }


class DNSResolver:
    def __init__(self, nameservers: Optional[List[str]] = None, loop: Optional[asyncio.AbstractEventLoop] = None,
                 **kwargs: Any) -> None:
        self.loop = loop or asyncio.get_event_loop()
        assert self.loop is not None
        kwargs.pop('sock_state_cb', None)
        self._channel = pycares.Channel(sock_state_cb=self._sock_state_cb, **kwargs)
        if nameservers:
            self.nameservers = nameservers
        self._read_fds = set() # type: Set[int]
        self._write_fds = set() # type: Set[int]