Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
resp = self.query('http://www.iana.org/_css/2013.1/print.css',
resolveRevisits='1',
fields='urlkey,orig.length,orig.offset,orig.filename'
)
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
cdxes = resp.text.splitlines()
cdx = cdxes[0]
cdx = CDXObject(cdx.encode('utf-8'))
assert cdx['orig.offset'] == '-'
assert cdx['orig.length'] == '-'
assert cdx['orig.filename'] == '-'
for cdx in cdxes[1:]:
cdx = CDXObject(cdx.encode('utf-8'))
assert cdx['orig.offset'] != '-'
assert cdx['orig.length'] != '-'
assert cdx['orig.filename'] == 'iana.warc.gz'
def test_filters_3(self):
"""
filter cdxes by mimetype and filename field, exact match.
"""
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
filter=('!mime:warc/revisit', '!filename:dupes.warc.gz'))
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
lines = resp.text.splitlines()
assert len(lines) == 1
for l in lines:
cdx = CDXObject(l.encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['timestamp'] == '20140126200625'
assert cdx['mime'] == 'text/css'
assert cdx['filename'] == 'iana.warc.gz'
def test_filters_1(self):
"""
filter cdxes by mimetype and filename field, exact match.
"""
resp = self.query('http://www.iana.org/_css/2013.1/screen.css',
filter=('mime:warc/revisit', 'filename:dupes.warc.gz'))
assert resp.status_code == 200
assert resp.content_type == 'text/x-cdxj'
lines = resp.text.splitlines()
assert len(lines) > 0
for l in lines:
cdx = CDXObject(l.encode('utf-8'))
assert cdx['urlkey'] == 'org,iana)/_css/2013.1/screen.css'
assert cdx['timestamp'] == '20140127171239'
assert cdx['mime'] == 'warc/revisit'
assert cdx['filename'] == 'dupes.warc.gz'
key = params['key']
key_exact = key + self.EXACT_SUFFIX_B
tld = key.split(b',')[0]
for acl in acl_iter:
# skip empty/invalid lines
if not acl:
continue
acl_key = acl.split(b' ')[0]
if key_exact == acl_key:
return CDXObject(acl)
if key.startswith(acl_key):
return CDXObject(acl)
# if acl key already less than first tld,
# no match can be found
if acl_key < tld:
break
return self.default_rule
def do_load(index_list):
for line in index_list:
if isinstance(line, str):
line = line.encode('utf-8')
yield CDXObject(line)
no_except_close(res)
raise NotFoundException(url)
if res and res.headers.get('Memento-Datetime'):
if res.status_code >= 400:
no_except_close(res)
raise NotFoundException(url)
if res.status_code >= 300:
info = self._extract_location(url, res.headers.get('Location'))
else:
info = self._extract_location(url, res.headers.get('Content-Location'))
url, timestamp, load_url = info
cdx = CDXObject()
cdx['urlkey'] = canonicalize(url)
cdx['timestamp'] = timestamp
cdx['url'] = url
cdx['load_url'] = load_url
if 'Referer' in headers:
cdx['set_referrer'] = headers['Referer']
return iter([cdx])
def _add_rule(self, url, access, exact_match=False):
"""Adds an rule to the acl file
:param str url: The URL for the rule
:param str access: The access value for the rule
:param bool exact_match: Is the rule to be added an exact match
:rtype: None
"""
if not self.validate_access(access):
return
acl = CDXObject()
acl['urlkey'] = self.to_key(url, exact_match)
acl['timestamp'] = '-'
acl['access'] = access
acl['url'] = url
i = 0
replace = False
for rule in self.rules:
if acl['urlkey'] == rule['urlkey'] and acl['timestamp'] == rule['timestamp']:
replace = True
break
if acl > rule:
break
def do_load(lines):
for line in lines:
if not line:
continue
cdx = CDXObject(line)
self._set_load_url(cdx, params)
yield cdx