Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def serve_record(self, environ, coll='$root', url=''):
"""Serve a URL's content from a WARC/ARC record in replay mode or from the live web in
live, proxy, and record mode.
:param dict environ: The WSGI environment dictionary for the request
:param str coll: The name of the collection the record is to be served from
:param str url: The URL for the corresponding record to be served if it exists
:return: WbResponse containing the contents of the record/URL
:rtype: WbResponse
"""
if coll in self.warcserver.list_fixed_routes():
return WbResponse.text_response('Error: Can Not Record Into Custom Collection "{0}"'.format(coll))
return self.serve_content(environ, coll, url, record=True)
coll_config = self.get_coll_config(coll)
metadata = coll_config.get('metadata')
view = BaseInsertView(self.rewriterapp.jinja_env, 'search.html')
wb_prefix = environ.get('SCRIPT_NAME', '')
if wb_prefix:
wb_prefix += '/'
content = view.render_to_string(environ,
wb_prefix=wb_prefix,
coll=coll,
coll_config=coll_config,
metadata=metadata)
return WbResponse.text_response(content, content_type='text/html; charset="utf-8"')
def format_response(self, response, wb_url, full_prefix, is_timegate, is_proxy):
memento_ts = None
if not isinstance(response, WbResponse):
content_type = 'text/html'
# if not replay outer frame, specify utf-8 charset
if not self.is_framed_replay(wb_url):
content_type += '; charset=utf-8'
else:
memento_ts = wb_url.timestamp
response = WbResponse.text_response(response, content_type=content_type)
if self.enable_memento and response.status_headers.statusline.startswith('200'):
self._add_memento_links(wb_url.url, full_prefix, None, memento_ts,
response.status_headers, is_timegate, is_proxy)
return response
def _not_found_response(self, environ, url):
resp = self.not_found_view.render_to_string(environ, url=url)
return WbResponse.text_response(resp, status='404 Not Found', content_type='text/html')
def json_response(obj, status='200 OK', content_type='application/json; charset=utf-8'):
"""Utility method for constructing a JSON response.
:param dict obj: The dictionary to be serialized in JSON format
:param str content_type: The content-type of the response
:param str status: The HTTP status line
:return: WbResponse JSON response
:rtype: WbResponse
"""
return WbResponse.text_response(json.dumps(obj), status, content_type)
text = res.text
if not res.text:
status = '404 Not Found'
elif res.status_code:
status = str(res.status_code) + ' ' + res.reason
if res.status_code == 200 and output == 'link':
timegate, timemap = self._get_timegate_timemap(wb_url.url, full_prefix, wb_url.mod)
text = MementoUtils.wrap_timemap_header(wb_url.url,
timegate,
timemap,
res.text)
return WbResponse.text_response(text,
content_type=content_type,
status=status)
def _error_response(self, environ, wbe):
status = wbe.status()
resp = self.error_view.render_to_string(environ,
err_msg=wbe.url,
err_details=wbe.msg,
err_status=wbe.status_code)
return WbResponse.text_response(resp, status=status, content_type='text/html')