Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_channel_local_ip(self):
self.result, self.errorno = None, None
def cb(result, errorno):
self.result, self.errorno = result, errorno
self.channel = pycares.Channel(timeout=5.0, tries=1, servers=['8.8.8.8'], local_ip='127.0.0.1')
self.channel.query('google.com', pycares.QUERY_TYPE_A, cb)
self.wait()
self.assertEqual(self.result, None)
self.assertEqual(self.errorno, pycares.errno.ARES_ECONNREFUSED)
def test_channel_nameservers(self):
self.result, self.errorno = None, None
def cb(result, errorno):
self.result, self.errorno = result, errorno
self.channel = pycares.Channel(timeout=5.0, tries=1, servers=['8.8.8.8'])
self.channel.query('google.com', pycares.QUERY_TYPE_A, cb)
self.wait()
self.assertNoError(self.errorno)
def test_lookup(self):
channel = pycares.Channel(
lookups="b",
timeout=1,
tries=1,
socket_receive_buffer_size=4096,
servers=["8.8.8.8", "8.8.4.4"],
tcp_port=53,
udp_port=53,
rotate=True,
)
def on_result(result, errorno):
self.result, self.errorno = result, errorno
for domain in [
"google.com",
"microsoft.com",
def setUp(self):
self.channel = pycares.Channel(timeout=5.0, tries=1)
def __init__(self, loop):
self._channel = pycares.Channel(sock_state_cb=self._sock_state_cb)
self.loop = loop
self._timer = pyuv.Timer(self.loop)
self._fd_map = {}
timeout = channel.timeout()
if not timeout:
channel.process_fd(pycares.ARES_SOCKET_BAD, pycares.ARES_SOCKET_BAD)
continue
rlist, wlist, xlist = select.select(read_fds, write_fds, [], timeout)
for fd in rlist:
channel.process_fd(fd, pycares.ARES_SOCKET_BAD)
for fd in wlist:
channel.process_fd(pycares.ARES_SOCKET_BAD, fd)
if __name__ == '__main__':
def cb(result, error):
print(result)
print(error)
channel = pycares.Channel()
channel.gethostbyname('google.com', socket.AF_INET, cb)
channel.query('google.com', pycares.QUERY_TYPE_A, cb)
channel.query('sip2sip.info', pycares.QUERY_TYPE_SOA, cb)
wait_channel(channel)
print('Done!')
def resolve_cares(name):
# create new c-ares channel
careschan = pycares.Channel(timeout=DNS_TIMEOUT, tries=1)
# if we don't get a response we return the default value
result = Resultholder()
result.value = DNS_TIMEOUT_VALUE
def setresult_cb(res, error):
# ignore error and just take first result ip (randomized anyway)
if res and res.addresses:
result.value = res.addresses[0]
# resolve with cb
careschan.gethostbyname(name, socket.AF_INET, setresult_cb)
# now do the actual work
readfds, writefds = careschan.getsock()
canreadfds, canwritefds, _ = select.select(readfds, writefds, [],
def initialize(self, io_loop=None):
self.io_loop = io_loop or IOLoop.current()
self.channel = pycares.Channel(sock_state_cb=self._sock_state_cb)
self.fds = {}
def __init__(self):
self.channel = pycares.Channel(timeout=5.0, tries=2)