Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
__all__ = ['Resolver']
import pycares
import pyuv
import socket
from collections import namedtuple
from functools import partial
from gevent.hub import get_hub, Waiter
Result = namedtuple('Result', ['value', 'exception'])
class Resolver(object):
_ares_flag_map = [(getattr(socket, 'NI_NUMERICHOST', 1), pycares.ARES_NI_NUMERICHOST),
(getattr(socket, 'NI_NUMERICSERV', 2), pycares.ARES_NI_NUMERICSERV),
(getattr(socket, 'NI_NOFQDN', 4), pycares.ARES_NI_NOFQDN),
(getattr(socket, 'NI_NAMEREQD', 8), pycares.ARES_NI_NAMEREQD),
(getattr(socket, 'NI_DGRAM', 16), pycares.ARES_NI_DGRAM)]
_ares_errno_map = {pycares.errno.ARES_ENOTFOUND: (socket.gaierror, (8, 'nodename nor servname provided, or not known')),
pycares.errno.ARES_ENODATA: (socket.gaierror, (8, 'nodename nor servname provided, or not known'))}
_ares_errno_map2 = {pycares.errno.ARES_ENOTFOUND: (socket.herror, (1, 'Unknown host')),
pycares.errno.ARES_ENODATA: (socket.gaierror, (8, 'nodename nor servname provided, or not known'))}
_addrinfo_errno_map = {pyuv.errno.UV_ENOENT: (socket.gaierror, (8, 'nodename nor servname provided, or not known'))}
def __init__(self, hub=None):
self.hub = hub or get_hub()
self._channel = pycares.Channel(sock_state_cb=self._sock_state_cb)
self._timer = pyuv.Timer(self.hub.loop._loop)
self._fd_map = {}