Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_result_not_ascii(self):
self.result, self.errorno = None, None
def cb(result, errorno):
self.result, self.errorno = result, errorno
self.channel.query('ayesas.com', pycares.QUERY_TYPE_SOA, cb)
self.wait()
self.assertNoError(self.errorno)
self.assertEqual(type(self.result), pycares.ares_query_soa_result)
self.assertIsInstance(self.result.hostmaster, bytes) # it's not ASCII
self.assertTrue(self.result.ttl >= 0)
def gethostbyname(self, name, cb):
self._channel.gethostbyname(name, socket.AF_INET, cb)
if __name__ == '__main__':
def query_cb(result, error):
print(result)
print(error)
def gethostbyname_cb(result, error):
print(result)
print(error)
loop = pyuv.Loop.default_loop()
resolver = DNSResolver(loop)
resolver.query('google.com', pycares.QUERY_TYPE_A, query_cb)
resolver.query('sip2sip.info', pycares.QUERY_TYPE_SOA, query_cb)
resolver.gethostbyname('apple.com', gethostbyname_cb)
loop.run()
def gethostbyname(self, name, cb):
self._channel.gethostbyname(name, socket.AF_INET, cb)
if __name__ == '__main__':
def query_cb(result, error):
print(result)
print(error)
def gethostbyname_cb(result, error):
print(result)
print(error)
resolver = DNSResolver()
resolver.query('google.com', pycares.QUERY_TYPE_A, query_cb)
resolver.query('google.com', pycares.QUERY_TYPE_AAAA, query_cb)
resolver.query('facebook.com', pycares.QUERY_TYPE_A, query_cb)
resolver.query('sip2sip.info', pycares.QUERY_TYPE_SOA, query_cb)
resolver.gethostbyname('apple.com', gethostbyname_cb)
resolver.wait_channel()
continue
rlist, wlist, xlist = select.select(read_fds, write_fds, [], timeout)
for fd in rlist:
channel.process_fd(fd, pycares.ARES_SOCKET_BAD)
for fd in wlist:
channel.process_fd(pycares.ARES_SOCKET_BAD, fd)
if __name__ == '__main__':
def cb(result, error):
print(result)
print(error)
channel = pycares.Channel()
channel.gethostbyname('google.com', socket.AF_INET, cb)
channel.query('google.com', pycares.QUERY_TYPE_A, cb)
channel.query('sip2sip.info', pycares.QUERY_TYPE_SOA, cb)
wait_channel(channel)
print('Done!')
__all__ = ('DNSResolver', 'error')
READ = 1
WRITE = 2
query_type_map = {'A' : pycares.QUERY_TYPE_A,
'AAAA' : pycares.QUERY_TYPE_AAAA,
'ANY' : pycares.QUERY_TYPE_ANY,
'CNAME' : pycares.QUERY_TYPE_CNAME,
'MX' : pycares.QUERY_TYPE_MX,
'NAPTR' : pycares.QUERY_TYPE_NAPTR,
'NS' : pycares.QUERY_TYPE_NS,
'PTR' : pycares.QUERY_TYPE_PTR,
'SOA' : pycares.QUERY_TYPE_SOA,
'SRV' : pycares.QUERY_TYPE_SRV,
'TXT' : pycares.QUERY_TYPE_TXT
}
class DNSResolver:
def __init__(self, nameservers: Optional[List[str]] = None, loop: Optional[asyncio.AbstractEventLoop] = None,
**kwargs: Any) -> None:
self.loop = loop or asyncio.get_event_loop()
assert self.loop is not None
kwargs.pop('sock_state_cb', None)
self._channel = pycares.Channel(sock_state_cb=self._sock_state_cb, **kwargs)
if nameservers:
self.nameservers = nameservers
self._read_fds = set() # type: Set[int]
self._write_fds = set() # type: Set[int]