Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_idna_encoding_query_a(self):
host = 'españa.icom.museum'
self.result, self.errorno = None, None
def cb(result, errorno):
self.result, self.errorno = result, errorno
# try encoding it as utf-8
self.channel.query(host.encode(), pycares.QUERY_TYPE_A, cb)
self.wait()
self.assertEqual(self.errorno, pycares.errno.ARES_ENOTFOUND)
self.assertEqual(self.result, None)
# use it as is (it's IDNA encoded internally)
self.channel.query(host, pycares.QUERY_TYPE_A, cb)
self.wait()
self.assertNoError(self.errorno)
for r in self.result:
self.assertEqual(type(r), pycares.ares_query_a_result)
self.assertNotEqual(r.host, None)
def test_attempt_no_mx(self):
env = Envelope('sender@example.com', ['rcpt@example.com'])
a_ret = FakeAAnswer(False, [('1.2.3.4', )])
mx = MxSmtpRelay()
static = self.mox.CreateMock(StaticSmtpRelay)
self.mox.StubOutWithMock(mx, 'new_static_relay')
self.mox.StubOutWithMock(DNSResolver, 'query')
DNSResolver.query('example.com', 'MX').AndRaise(DNSError(ARES_ENOTFOUND))
DNSResolver.query('example.com', 'A').AndReturn(FakeAsyncResult(a_ret))
mx.new_static_relay('example.com', 25).AndReturn(static)
static.attempt(env, 0)
self.mox.ReplayAll()
mx.attempt(env, 0)
def test_check_dnsrbl(self):
class TestSession(object):
address = ('1.2.3.4', 56789)
class TestValidators(object):
def __init__(self):
self.session = TestSession()
@check_dnsbl('test.example.com')
def validate_mail(self, reply, sender):
assert False
DNSResolver.query('4.3.2.1.test.example.com', 'A').AndRaise(DNSError(ARES_ENOTFOUND))
DNSResolver.query('4.3.2.1.test.example.com', 'A').AndReturn(FakeAsyncResult())
self.mox.ReplayAll()
validators = TestValidators()
reply = Reply('250', '2.0.0 Ok')
self.assertRaises(AssertionError, validators.validate_mail, reply, 'asdf')
self.assertEqual('250', reply.code)
self.assertEqual('2.0.0 Ok', reply.message)
validators.validate_mail(reply, 'asdf')
self.assertEqual('550', reply.code)
self.assertEqual('5.7.1 Access denied', reply.message)
def test_attempt_no_records(self):
env = Envelope('sender@example.com', ['rcpt@example.com'])
mx = MxSmtpRelay()
self.mox.StubOutWithMock(mx, 'new_static_relay')
self.mox.StubOutWithMock(DNSResolver, 'query')
DNSResolver.query('example.com', 'MX').AndRaise(DNSError(ARES_ENOTFOUND))
DNSResolver.query('example.com', 'A').AndRaise(DNSError(ARES_ENOTFOUND))
self.mox.ReplayAll()
with self.assertRaises(PermanentRelayError):
mx.attempt(env, 0)
def test_dnsblocklist_get(self):
DNSResolver.query('4.3.2.1.test.example.com', 'A').AndReturn(FakeAsyncResult())
DNSResolver.query('8.7.6.5.test.example.com', 'A').AndRaise(DNSError(ARES_ENOTFOUND))
self.mox.ReplayAll()
self.assertTrue(self.dnsbl.get('1.2.3.4'))
self.assertNotIn('5.6.7.8', self.dnsbl)
from collections import namedtuple
from functools import partial
from gevent.hub import get_hub, Waiter
Result = namedtuple('Result', ['value', 'exception'])
class Resolver(object):
_ares_flag_map = [(getattr(socket, 'NI_NUMERICHOST', 1), pycares.ARES_NI_NUMERICHOST),
(getattr(socket, 'NI_NUMERICSERV', 2), pycares.ARES_NI_NUMERICSERV),
(getattr(socket, 'NI_NOFQDN', 4), pycares.ARES_NI_NOFQDN),
(getattr(socket, 'NI_NAMEREQD', 8), pycares.ARES_NI_NAMEREQD),
(getattr(socket, 'NI_DGRAM', 16), pycares.ARES_NI_DGRAM)]
_ares_errno_map = {pycares.errno.ARES_ENOTFOUND: (socket.gaierror, (8, 'nodename nor servname provided, or not known')),
pycares.errno.ARES_ENODATA: (socket.gaierror, (8, 'nodename nor servname provided, or not known'))}
_ares_errno_map2 = {pycares.errno.ARES_ENOTFOUND: (socket.herror, (1, 'Unknown host')),
pycares.errno.ARES_ENODATA: (socket.gaierror, (8, 'nodename nor servname provided, or not known'))}
_addrinfo_errno_map = {pyuv.errno.UV_ENOENT: (socket.gaierror, (8, 'nodename nor servname provided, or not known'))}
def __init__(self, hub=None):
self.hub = hub or get_hub()
self._channel = pycares.Channel(sock_state_cb=self._sock_state_cb)
self._timer = pyuv.Timer(self.hub.loop._loop)
self._fd_map = {}
def _sock_state_cb(self, fd, readable, writable):
if readable or writable:
if fd not in self._fd_map:
# New socket
handle = pyuv.Poll(self.hub.loop._loop, fd)
:param ip: The IP address string to check.
:param timeout: A timeout in seconds before ``False`` is returned.
:param strict: If ``True``, DNS exceptions that are not ``NXDOMAIN``
(including timeouts) will also result in a ``True``
return value.
:returns: ``True`` if the DNSBL had an entry for the given IP address,
``False`` otherwise.
"""
with gevent.Timeout(timeout, None):
query = self._build_query(ip)
try:
DNSResolver.query(query, 'A').get()
except DNSError as exc:
if exc.errno == ARES_ENOTFOUND:
return False
logging.log_exception(__name__, query=query)
return not strict
else:
return True
return strict