Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.exploit_mode = False
# For error handling, the first "last response" is set to SUCCESS to
# allow the _should_stop_scan method to match it's "SFFFF...FFF" pattern
self._last_responses = deque(maxlen=MAX_RESPONSE_COLLECT)
self._last_responses.extend([ResponseMeta(True, SUCCESS)] * 100)
self._count_lock = threading.RLock()
# For rate limiting and timeouts
self._rate_limit_last_time_called = 0.0
self._rate_limit_lock = threading.RLock()
self._adjust_timeout_lock = threading.RLock()
self._adjust_timeout_last_call = 0.0
# Keep track of sum(rtt) for each debugging_id
self._rtt_sum_debugging_id = SynchronizedLRUDict(capacity=128)
# Avoid multiple consecutive calls to reduce the worker pool size
self._last_call_to_adjust_workers = None
self._should_adjust_workers_lock = threading.RLock()
# For timeout auto adjust and general stats
self._total_requests = 0
# Timeout is kept by host
self._host_timeout = {}
self._global_timeout = DEFAULT_TIMEOUT
# Used in the pause on HTTP error feature to keep track of when the
# core slept waiting for the remote end to be reachable
self._sleep_log = {}
self._clear_sleep_log()
def __init__(self, func, lru_size=10):
self.func = func
self.cache = SynchronizedLRUDict(lru_size)
This method was taken from:
# $Id: download.py,v 1.30 2004/05/13 09:55:30 torh Exp $
That is part of :
swup-0.0.20040519/
Developed by:
# Copyright 2001 - 2003 Trustix AS -
# Copyright 2003 - 2004 Tor Hveem -
# Copyright 2004 Omar Kilani for tinysofa -
"""
om.out.debug('Enabling _dns_cache()')
if not hasattr(socket, 'already_configured'):
socket._getaddrinfo = socket.getaddrinfo
_dns_cache = SynchronizedLRUDict(200)
def _caching_getaddrinfo(*args, **kwargs):
query = (args)
try:
res = _dns_cache[query]
#This was too noisy and not so useful
#om.out.debug('Cached DNS response for domain: ' + query[0] )
return res
except KeyError:
res = socket._getaddrinfo(*args, **kwargs)
_dns_cache[args] = res
msg = 'DNS response from DNS server for domain: %s'
om.out.debug(msg % query[0])
return res
def __init__(self):
super(ParserCache, self).__init__()
self._cache = SynchronizedLRUDict(self.CACHE_SIZE)
self._can_parse_cache = SynchronizedLRUDict(self.CACHE_SIZE * 10)
self._parser_finished_events = {}
self._parser_blacklist = DiskSet()
def __init__(self, url_opener):
self._url_opener = url_opener
# Cache to measure RTT
self._rtt_mutant_cache = SynchronizedLRUDict(capacity=128)
self._rtt_mutant_lock = threading.RLock()
self._specific_rtt_mutant_locks = dict()
def __init__(self):
self._cache = SynchronizedLRUDict(self.MAX_SIZE)