Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_strerror_str(self):
for key in pycares.errno.errorcode:
self.assertTrue(type(pycares.errno.strerror(key)), str)
def resolve(self, host, port, family=0):
if is_valid_ip(host):
addresses = [host]
else:
# gethostbyname doesn't take callback as a kwarg
self.channel.gethostbyname(host, family, (yield gen.Callback(1)))
callback_args = yield gen.Wait(1)
assert isinstance(callback_args, gen.Arguments)
assert not callback_args.kwargs
result, error = callback_args.args
if error:
raise Exception('C-Ares returned error %s: %s while resolving %s' %
(error, pycares.errno.strerror(error), host))
addresses = result.addresses
addrinfo = []
for address in addresses:
if '.' in address:
address_family = socket.AF_INET
elif ':' in address:
address_family = socket.AF_INET6
else:
address_family = socket.AF_UNSPEC
if family != socket.AF_UNSPEC and family != address_family:
raise Exception('Requested socket family %d but got %d' %
(family, address_family))
addrinfo.append((address_family, (address, port)))
raise gen.Return(addrinfo)
def _ares_cb(self, cb, result, error):
if error is not None:
error_data = self._ares_errno_map.get(error)
if not error_data:
exc = socket.gaierror(error, pycares.errno.strerror(error))
else:
klass, args = error_data
exc = klass(*args)
cb(Result(None, exc))
else:
cb(Result(result, None))
def cb(result, error):
if error is not None:
print('Error: (%d) %s' % (error, pycares.errno.strerror(error)))
else:
parts = [
';; QUESTION SECTION:',
';%s\t\t\tIN\t%s' % (hostname, qtype.upper()),
'',
';; ANSWER SECTION:'
]
if not isinstance(result, collections.abc.Iterable):
result = [result]
for r in result:
txt = '%s\t\t%d\tIN\t%s' % (hostname, r.ttl, r.type)
if r.type in ('A', 'AAAA'):
parts.append('%s\t%s' % (txt, r.host))
elif r.type == 'CNAME':
def _getnameinfo(self, future, sockaddr, result, error):
if error:
future.set_exception(
Exception('C-Ares returned error %s: %s while resolving %s' %
(error, pycares.errno.strerror(error),
str(sockaddr))))
else:
service = result.service
if not service:
service = 'https' if sockaddr[1] == 443 else 'http'
future.set_result((result.node, service))