Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handle_connect(self, env):
sock = self.get_request_socket(env)
if not sock:
return WbResponse.text_response('HTTPS Proxy Not Supported',
'405 HTTPS Proxy Not Supported')
sock.send(b'HTTP/1.0 200 Connection Established\r\n')
sock.send(b'Proxy-Connection: close\r\n')
sock.send(b'Server: pywb proxy\r\n')
sock.send(b'\r\n')
hostname, port = env['REL_REQUEST_URI'].split(':')
if not self.use_wildcard:
certfile = self.ca.cert_for_host(hostname)
else:
certfile = self.ca.get_wildcard_cert(hostname)
try:
ssl_sock = ssl.wrap_socket(sock,
rewritten_url = rewriter.rewrite(rel_request_uri)
# if post, can't redirect as that would lost the post data
# (can't use 307 because FF will show confirmation warning)
if ref_request.method == 'POST':
new_wb_url = WbUrl(rewritten_url[len(rewriter.prefix):])
ref_request.wb_url.url = new_wb_url.url
return ref_route.handler(ref_request)
final_url = urlunsplit((ref_split.scheme,
ref_split.netloc,
rewritten_url,
'',
''))
return WbResponse.redir_response(final_url, status='302 Temp Redirect')
def handle_magic_page(self, env):
coll, ts = self.get_proxy_coll_ts(env)
ip = self._get_ip(env)
res = json.dumps({'ip': ip, 'coll': coll, 'ts': ts})
return WbResponse.text_response(res, content_type='application/json')
def render_response(self, **kwargs):
template_result = self.render_to_string(**kwargs)
status = kwargs.get('status', '200 OK')
content_type = kwargs.get('content_type', 'text/html; charset=utf-8')
return WbResponse.text_response(template_result,
status=status,
content_type=content_type)
def __call__(self, wbrequest):
params = self.extract_params_from_wsgi_env(wbrequest.env)
try:
cdx_iter = self.index_handler.load_cdx(wbrequest, params)
except NotFoundException:
msg = 'No Captures found for: ' + params.get('url')
if params.get('output') == 'json':
msg = json.dumps(dict(error=msg))
content_type='application/json'
else:
content_type='text/plain'
return WbResponse.text_response(msg, content_type=content_type,
status='404 Not Found')
return WbResponse.text_stream(cdx_iter,
content_type='text/plain')
}
#head_insert_func = self.get_head_insert_func(wbrequest, cdx)
head_insert_func = self.head_insert_view.create_insert_func(wbrequest,
cdx)
result = self.rewriter.rewrite_content(wbrequest.urlrewriter,
status_headers,
stream,
head_insert_func=head_insert_func,
urlkey=urlkey)
status_headers, gen, is_rewritten = result
return WbResponse(status_headers, gen)
params = self.extract_params_from_wsgi_env(wbrequest.env)
try:
cdx_iter = self.index_handler.load_cdx(wbrequest, params)
except NotFoundException:
msg = 'No Captures found for: ' + params.get('url')
if params.get('output') == 'json':
msg = json.dumps(dict(error=msg))
content_type='application/json'
else:
content_type='text/plain'
return WbResponse.text_response(msg, content_type=content_type,
status='404 Not Found')
return WbResponse.text_stream(cdx_iter,
content_type='text/plain')
self.content_loader = content_loader
framed = config.get('framed_replay')
self.content_rewriter = RewriteContent(is_framed_replay=framed)
self.head_insert_view = HeadInsertView.init_from_config(config)
self.buffer_response = config.get('buffer_response', True)
self.redir_to_exact = config.get('redir_to_exact', True)
memento = config.get('enable_memento', False)
if memento:
self.response_class = MementoResponse
else:
self.response_class = WbResponse
self.enable_range_cache = config.get('enable_ranges', True)
self._reporter = config.get('reporter')
def render_response(self, wbrequest, cdx_lines, **kwargs):
memento_lines = make_timemap(wbrequest, cdx_lines)
return WbResponse.text_stream(memento_lines,
content_type=LINK_FORMAT)