How to use the aiodns.error.DNSError function in aiodns

To help you get started, we’ve selected a few aiodns examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aio-libs / aiohttp / tests / test_resolver.py View on Github external
async def test_async_resolver_negative_lookup(loop) -> None:
    with patch('aiodns.DNSResolver') as mock:
        mock().gethostbyname.side_effect = aiodns.error.DNSError()
        resolver = AsyncResolver(loop=loop)
        with pytest.raises(OSError):
            await resolver.resolve('doesnotexist.bla')
github constverum / ProxyBroker / proxybroker / resolver.py View on Github external
async def _resolve(self, host, qtype):
        try:
            resp = await asyncio.wait_for(
                self._resolver.query(host, qtype), timeout=self._timeout
            )
        except (aiodns.error.DNSError, asyncio.TimeoutError):
            raise ResolveError
        else:
            return resp
github magma / magma / lte / gateway / python / magma / pipelined / redirect.py View on Github external
def add_flows(dns_resolve_future):
            """
            Callback for when DNS query is resolved, adds the bypass flows
            """
            try:
                ips = [entry.host for entry in dns_resolve_future.result()]
                ttl = min(entry.ttl for entry in dns_resolve_future.result())
            except aiodns.error.DNSError as err:
                self.logger.error("Error: ip lookup for {}: {}".format(
                                  redirect_addr_host, err))
                return
            self._dns_cache.get(redirect_addr_host, lambda: ips, max_age=ttl)
            self._install_ipv4_bypass_flows(datapath, imsi, rule, rule_num,
                                            rule_version, priority, ips, ue_ip)
github nnewsom / webbies / lib / DNSResolver.py View on Github external
def query_name(self,hostname):
        if hostname not in self.lookup_history:
            try:
                ips = yield from self.resolver.query(hostname,'A')
            except aiodns.error.DNSError:
                ips = []
            self.lookup_history[hostname] = ips
        return self.lookup_history[hostname]
github blark / aiodnsbrute / aiodnsbrute / cli.py View on Github external
self.resolver.nameservers = resolvers
        self.logger.info(
            f"Using recursive DNS with the following servers: {self.resolver.nameservers}"
        )

        if wildcard:
            # 63 chars is the max allowed segment length, there is practically no chance that it will be a legit record
            random_sld = (
                lambda: f'{"".join(random.choice(string.ascii_lowercase + string.digits) for i in range(63))}'
            )
            try:
                self.lookup_type = "query"
                wc_check = self.loop.run_until_complete(
                    self._dns_lookup(f"{random_sld()}.{domain}")
                )
            except aiodns.error.DNSError as err:
                # we expect that the record will not exist and error 4 will be thrown
                self.logger.info(
                    f"No wildcard response was detected for this domain."
                )
                wc_check = None
            finally:
                if wc_check is not None:
                    self.ignore_hosts = [host.host for host in wc_check]
                    self.logger.warn(
                        f"Wildcard response detected, ignoring answers containing {self.ignore_hosts}"
                    )
        else:
            self.logger.warn("Wildcard detection is disabled")

        if query:
            self.logger.info(
github madedotcom / photon-pump / photonpump / discovery.py View on Github external
)
            try:
                result = await self.resolver.query(self.name, "A")
                random.shuffle(result)

                if result:
                    LOG.debug("Found %s hosts for name %s", len(result), self.name)
                    current_attempt = 0
                    self.seeds = [
                        NodeService(address=node.host, port=self.port, secure_port=None)
                        for node in result
                        if node.host != self.failed_node.address
                    ]

                    break
            except aiodns.error.DNSError:
                LOG.warning(
                    "Failed to fetch gossip seeds for dns name %s",
                    self.name,
                    exc_info=True,
                )
            current_attempt += 1
            await asyncio.sleep(1)
github surycat / nyuki-legacy / nyuki / discovery / dns.py View on Github external
async def periodic_query(self):
        while True:
            try:
                answers = await self._resolver.query(self._entry, 'A')
            except DNSError as exc:
                log.error("DNS query failed for discovery service")
                log.debug("DNS failure reason: %s", str(exc))
                await asyncio.sleep(self._RETRY_PERIOD)
                continue
            addresses = [record.host for record in answers]

            # Trigger callbacks for discovered instances IPs
            for callback in self._callbacks:
                coro = (
                    callback if asyncio.iscoroutinefunction(callback)
                    else asyncio.coroutine(callback)
                )
                asyncio.ensure_future(coro(addresses))

            # Periodically execute this method
            await asyncio.sleep(self._period)
github home-assistant / home-assistant / homeassistant / components / sensor / dnsip.py View on Github external
async def async_update(self):
        """Get the current DNS IP address for hostname."""
        from aiodns.error import DNSError
        try:
            response = await self.resolver.query(self.hostname,
                                                 self.querytype)
        except DNSError as err:
            _LOGGER.warning("Exception while resolving host: %s", err)
            response = None
        if response:
            self._state = response[0].host
        else:
            self._state = None
github synackray / vcenter-netbox-sync / run.py View on Github external
"""
    result = (ip, "")
    allowed_chars = "abcdefghijklmnopqrstuvwxyz0123456789-."
    log.info("Requesting PTR record for %s.", ip)
    try:
        resp = await resolver.gethostbyaddr(ip)
        # Make sure records comply to NetBox and DNS expected format
        if all([bool(c.lower() in allowed_chars) for c in resp.name]):
            result = (ip, resp.name.lower())
            log.debug("PTR record for %s is '%s'.", ip, result[1])
        else:
            log.debug(
                "Invalid characters detected in PTR record '%s'. Nulling.",
                resp.name
                )
    except aiodns.error.DNSError as err:
        log.info("Unable to find record for %s: %s", ip, err.args[1])
    return result

aiodns

Simple DNS resolver for asyncio

MIT
Latest version published 9 months ago

Package Health Score

87 / 100
Full package analysis

Similar packages