Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
urlrewriter = UrlRewriter(wb_url,
prefix=full_prefix,
full_prefix=full_prefix,
rel_prefix=rel_prefix,
pywb_static_prefix=pywb_static_prefix)
framed_replay = self.framed_replay
url_parts = urlsplit(wb_url.url)
if not url_parts.path:
return self.send_redirect('/', url_parts, urlrewriter)
self.unrewrite_referrer(environ, full_prefix)
urlkey = canonicalize(wb_url.url)
inputreq = RewriteInputRequest(environ, urlkey, wb_url.url, content_rw)
inputreq.include_method_query(wb_url.url)
range_start, range_end, skip_record = self._check_range(inputreq, wb_url)
setcookie_headers = None
cookie_key = None
if self.cookie_tracker:
cookie_key = self.get_cookie_key(kwargs)
if cookie_key:
res = self.cookie_tracker.get_cookie_headers(wb_url.url,
urlrewriter,
cookie_key,
environ.get('HTTP_COOKIE', ''))
def links_to_cdxobject(self, link_header, def_name):
results = MementoUtils.parse_links(link_header, def_name)
original = results['original']['url']
key = canonicalize(original)
mementos = results['mementos']
for val in mementos:
dt = val['datetime']
ts = http_date_to_timestamp(dt)
cdx = CDXObject()
cdx['urlkey'] = key
cdx['timestamp'] = ts
cdx['url'] = original
cdx['mem_rel'] = val.get('rel', '')
cdx['memento_url'] = val['url']
load_url = self._get_replay_url(cdx['timestamp'], original)
cdx['load_url'] = load_url
def to_key(self, url_or_surt, exact_match=False):
""" If 'url_or_surt' already a SURT, use as is
If exact match, add the exact match suffix
:param str url_or_surt: The url or surt to be converted to an acl key
:param bool exact_match: Should the exact match suffix be added to key
:rtype: str
"""
if self.SURT_RX.search(url_or_surt):
result = url_or_surt
else:
result = canonicalize(url_or_surt)
if exact_match:
result += AccessChecker.EXACT_SUFFIX
return result
def render_content(self, wbrequest):
if wbrequest.wb_url.mod == 'vi_':
return self._get_video_info(wbrequest)
ref_wburl_str = wbrequest.extract_referrer_wburl_str()
if ref_wburl_str:
wbrequest.env['HTTP_REFERER'] = WbUrl(ref_wburl_str).url
urlkey = canonicalize(wbrequest.wb_url.url)
url = wbrequest.wb_url.url
inputreq = RewriteInputRequest(wbrequest.env, urlkey, url,
self.content_rewriter)
req_data = inputreq.reconstruct_request(url)
headers = {'Content-Length': len(req_data),
'Content-Type': 'application/request'}
if wbrequest.wb_url.is_latest_replay():
closest = 'now'
else:
closest = wbrequest.wb_url.timestamp
upstream_url = self.upstream_url.format(url=quote(url),
not append_post):
continue
elif (not include_all and
record.content_type == 'application/warc-fields'):
continue
entry = self.parse_warc_record(record)
elif record.format == 'arc':
entry = self.parse_arc_record(record)
if not entry:
continue
if entry.get('url') and not entry.get('urlkey'):
entry['urlkey'] = canonicalize(entry['url'], surt_ordered)
compute_digest = False
if (entry.get('digest', '-') == '-' and
record.rec_type not in ('revisit', 'request', 'warcinfo')):
compute_digest = True
elif not minimal and record.rec_type == 'request' and append_post:
method = record.http_headers.protocol
len_ = record.http_headers.get_header('Content-Length')
post_query = MethodQueryCanonicalizer(method,
entry.get('_content_type'),
len_,
record.raw_stream)
if len(ts_err) > 1 and ts_err[0] != 'file:':
url = 'http://' + ts_err[1]
if url.startswith('//'):
url = 'http:' + url
if remote_only or is_http(url):
is_remote = True
else:
is_remote = False
if not url.startswith('file:'):
url = to_file_url(url)
# explicit urlkey may be passed in (say for testing)
if not urlkey:
urlkey = canonicalize(url)
if is_remote:
(status_headers, stream) = self.fetch_http(url, urlkey, env,
req_headers,
follow_redirects,
skip_recording,
verify)
else:
(status_headers, stream) = self.fetch_local_file(url)
if timestamp is None:
timestamp = timestamp_now()
cdx = {'urlkey': urlkey,
'timestamp': timestamp,
'url': url,
def convert_to_cdxj(self):
cdxj_writer = CDXJ()
for filename in self.iter_cdx_files():
outfile = filename + 'j'
print('Converting {0} -> {1}'.format(filename, outfile))
with open(outfile + '.tmp', 'w+') as out:
with open(filename, 'rb') as fh:
for line in fh:
if line.startswith(b' CDX'):
continue
cdx = CDXObject(line)
cdx[URLKEY] = canonicalize(cdx[ORIGINAL])
cdxj_writer.write_cdx_line(out, cdx, cdx['filename'])
shutil.move(outfile + '.tmp', outfile)
os.remove(filename)