Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def getnameinfo(self, ip_port, flags, callback):
if not callable(callback):
raise AresError("a callable is required")
ip, port = ip_port
if port < 0 or port > 65535:
raise ValueError("port must be between 0 and 65535")
sa4 = _ffi.new("struct sockaddr_in*")
sa6 = _ffi.new("struct sockaddr_in6*")
if 1 == _lib.ares_inet_pton(socket.AF_INET, s2b(ip), _ffi.addressof(sa4.sin_addr)):
sa4.sin_family = socket.AF_INET
sa4.sin_port = socket.htons(port)
sa = sa4
elif 1 == _lib.ares_inet_pton(socket.AF_INET6, s2b(ip), _ffi.addressof(sa6.sin6_addr)):
sa6.sin6_family = socket.AF_INET6
sa6.sin6_port = socket.htons(port)
def wrapper(self, *args, **kwds):
if not self.channel:
raise AresError("Channel has already been destroyed")
return f(self, *args, **kwds)
return wrapper
def _set_servers(self, servers):
c = _ffi.new("struct ares_addr_node[%d]" % len(servers))
for i in range(len(servers)):
if 1 == _lib.ares_inet_pton(socket.AF_INET, s2b(servers[i]), _ffi.addressof(c[i].addr.addr4)):
c[i].family = socket.AF_INET
elif 1 == _lib.ares_inet_pton(socket.AF_INET6, s2b(servers[i]), _ffi.addressof(c[i].addr.addr6)):
c[i].family = socket.AF_INET6
else:
raise ValueError("invalid IP address")
if i > 0:
c[i - 1].next = _ffi.addressof(c[i])
r = _lib.ares_set_servers(self.channel, c)
if r != _lib.ARES_SUCCESS:
raise AresError()
def _get_servers(self):
servers = _ffi.new("struct ares_addr_node **")
r = _lib.ares_get_servers(self.channel, servers)
if r != ARES_SUCCESS:
raise AresError(errno.strerror(r))
server_list = []
server = _ffi.new("struct ares_addr_node **", servers[0])
while True:
if server == _ffi.NULL:
break
ip = _ffi.new("char []", _lib.INET6_ADDRSTRLEN)
if _ffi.NULL != _lib.ares_inet_ntop(server.family, _ffi.addressof(server.addr), ip, _lib.INET6_ADDRSTRLEN):
server_list.append(_ffi_string(ip, _lib.INET6_ADDRSTRLEN))
server = server.next
return server_list
if udp_port != -1:
options.udp_port = udp_port
optmask = optmask | _lib.ARES_OPT_UDP_PORT
if socket_send_buffer_size != -1:
options.socket_send_buffer_size = socket_send_buffer_size
optmask = optmask | _lib.ARES_OPT_SOCK_SNDBUF
if socket_receive_buffer_size != -1:
options.socket_receive_buffer_size = socket_receive_buffer_size
optmask = optmask | _lib.ARES_OPT_SOCK_RCVBUF
if sock_state_cb:
if not callable(sock_state_cb):
raise AresError("sock_state_cb is not callable")
userdata = _ffi.new_handle(sock_state_cb)
_global_set.add(userdata) # must keep this alive!
options.sock_state_cb = _sock_state_cb
options.sock_state_cb_data = userdata
optmask = optmask | _lib.ARES_OPT_SOCK_STATE_CB
if lookups:
options.lookups = lookups
optmask = optmask | _lib.ARES_OPT_LOOKUPS
if domains:
strs = [_ffi.new("char[]", s2b(i)) for i in domains]
c = _ffi.new("char *[%d]" % (len(domains) + 1))
for i in range(len(domains)):
c[i] = strs[i]
if domains:
strs = [_ffi.new("char[]", s2b(i)) for i in domains]
c = _ffi.new("char *[%d]" % (len(domains) + 1))
for i in range(len(domains)):
c[i] = strs[i]
options.domains = c
options.ndomains = len(domains)
optmask = optmask | _lib.ARES_OPT_DOMAINS
if rotate == True:
optmask = optmask | _lib.ARES_OPT_ROTATE
r = _lib.ares_init_options(channel, options, optmask)
if r != _lib.ARES_SUCCESS:
raise AresError()
self.channel = channel[0]
if servers:
self._set_servers(servers)
if local_ip:
self.set_local_ip(local_ip)
if local_dev:
self.set_local_dev(local_dev)