Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup(self, settings):
'''
Setup redis and tldextract
'''
self.extract = tldextract.TLDExtract()
self.redis_conn = redis.Redis(host=settings['REDIS_HOST'],
port=settings['REDIS_PORT'],
db=settings.get('REDIS_DB'))
try:
self.redis_conn.info()
self.logger.debug("Connected to Redis in ScraperHandler")
except ConnectionError:
self.logger.error("Failed to connect to Redis in ScraperHandler")
# plugin is essential to functionality
sys.exit(1)
urldict = {}
skipped = 0
for urldir in os.listdir(outputdir):
if urldir in INFOFILES:
continue
try:
urlfile = os.path.join(outputdir, urldir, '__urls')
related_urls = get_unique_urls(urldir, urlfile)
except IOError or OSError as einfo:
print "Unable to read", urldir, einfo, "skipping"
continue
TLDio = TLDExtract(cache_file='mozilla_tld_file.dat')
for dirty_url in related_urls:
# dirty_url because may contain ":"
if dirty_url.split(':') != -1:
url = dirty_url.split(':')[0]
else:
url = dirty_url
if urldict.has_key(url):
skipped +=1
continue
dnsplit= TLDio(url)
urldict.update({url : {
'domain' : dnsplit.domain,
'tld' : dnsplit.suffix,
# from the host or the host from the port. If there's a scheme we
# want to limit it to http or https.
if ':' in url:
scheme, host = url.split(':', 1)
# If there's a . in the scheme, then there wasn't a scheme
# and the : is delimiting the host from the port
if '.' not in scheme and scheme not in ('http', 'https'):
return u''
# Get a thread-local extractor if there is one. If not, create it.
extractor = getattr(_cached_tldextract, 'extractor', None)
if extractor is None:
# FIXME - This uses the tld set included with tldextract which
# will age over time. We should fix this so that we get a new
# file on deployment and use that file.
extractor = tldextract.TLDExtract(
suffix_list_url=None, # disable fetching the file via http
)
_cached_tldextract.extractor = extractor
res = extractor(url)
# If there's no tld, then this is probably an ip address or
# localhost. Also ignore .mil and .arpa addresses.
if res.suffix in ('', 'mil', 'in-addr.arpa'):
return u''
# Suffix is the tld. We want that plus the next level up.
return res.domain.decode('utf-8') + u'.' + res.suffix.decode('utf-8')
>>> d = ParsedURL("http://www.example.com/")
>>> d.split_hostname()
('www', 'example', 'com')
>>> d = ParsedURL("http://some.subdomain.of.example.co.uk/")
>>> d.split_hostname()
('some.subdomain.of', 'example', 'co.uk')
>>> '.'.join(d.split_hostname())
'some.subdomain.of.example.co.uk'
:param hostname: Hostname to split.
:type hostname: str
:returns: Subdomain, domain and TLD.
:rtype: tuple(str, str, str)
"""
extract = TLDExtract(fetch = False)
result = extract( to_utf8(hostname) )
return result.subdomain, result.domain, result.suffix
def get_domain(self, url):
no_fetch_extract = tldextract.TLDExtract(suffix_list_urls=None)
tld = no_fetch_extract(url)
self.domain = "{}.{}".format(tld.domain, tld.suffix)
def handle_noargs(self, **options):
self.setup_logging(verbosity=options.get('verbosity', 1))
filename = getattr(
settings, 'MULTISITE_PUBLIC_SUFFIX_LIST_CACHE',
os.path.join(tempfile.gettempdir(), 'multisite_tld.dat')
)
self.log("Updating {filename}".format(filename=filename))
extract = tldextract.TLDExtract(cache_file=filename)
extract.update(fetch_now=True)
self.log("Done.")
def advanced_url_parse(self, url):
if HAVE_TLDEXTRACT:
EXTRA_SUFFIXES = ('bit',)
parsed = False
try:
parsed = tldextract.TLDExtract(extra_suffixes=EXTRA_SUFFIXES, suffix_list_urls=None)(url)
except Exception as e:
log.error(e)
return parsed
else:
log.info("missed tldextract dependency")
OUTPUT_FILE_PATH_NAME
DEFAULT_REMOTE_BLACKLISTS = [
'http://winhelp2002.mvps.org/hosts.txt',
'http://someonewhocares.org/hosts/hosts',
'https://raw.githubusercontent.com/jmdugan/blocklists/master/corporations/facebook/all']
ALL_REMOTE_BLACKLISTS = [
'http://winhelp2002.mvps.org/hosts.txt',
'http://someonewhocares.org/hosts/hosts',
'https://adaway.org/hosts.txt',
'https://raw.githubusercontent.com/StevenBlack/hosts/master/data/StevenBlack/hosts',
'http://www.malwaredomainlist.com/hostslist/hosts.txt',
'http://pgl.yoyo.org/adservers/serverlist.php?hostformat=hosts;showintro=0']
# http://hosts-file.net/?s=Download
CACHE_EXPIRE = 3600*24*2 # 48 hours
TLD_EXTRACT = tldextract.TLDExtract(cache_file=TLDEXTRACT_CACHE)
def extract(self):
"""
域名导出
>>> d = Domain('www.example.com')
>>> d.extract()
ExtractResult(subdomain='www', domain='example', suffix='com')
:return: 导出结果
"""
extract_cache_file = config.data_storage_path.joinpath('public_suffix_list.dat')
tldext = tldextract.TLDExtract(extract_cache_file)
result = self.match()
if result:
return tldext(result)
else:
return None