Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _add_referer_header(self, request, origin_request=True):
if self.request is None:
return request
scheme = request.get_type()
original_scheme = self.request.get_type()
if scheme not in ["http", "https"]:
return request
if not origin_request and not self.request.has_header("Referer"):
return request
if (self._handle_referer and original_scheme in ["http", "https"] and
not (original_scheme == "https" and scheme != "https")):
# strip URL fragment (RFC 2616 14.36)
parts = _rfc3986.urlsplit(self.request.get_full_url())
parts = parts[:-1] + (None, )
referer = _rfc3986.urlunsplit(parts)
request.add_unredirected_header("Referer", referer)
return request
def _add_referer_header(self, request, origin_request=True):
if self.request is None:
return request
scheme = request.get_type()
original_scheme = self.request.get_type()
if scheme not in ["http", "https"]:
return request
if not origin_request and not self.request.has_header("Referer"):
return request
if (self._handle_referer and original_scheme in ["http", "https"] and
not (original_scheme == "https" and scheme != "https")):
# strip URL fragment (RFC 2616 14.36)
parts = _rfc3986.urlsplit(self.request.get_full_url())
parts = parts[:-1] + (None, )
referer = _rfc3986.urlunsplit(parts)
request.add_unredirected_header("Referer", referer)
return request
def is_html_file_extension(url, allow_xhtml):
if url is None:
return False
ext = os.path.splitext(_rfc3986.urlsplit(url)[2])[1]
html_exts = [".htm", ".html"]
if allow_xhtml:
html_exts += [".xhtml"]
return ext in html_exts
def _add_referer_header(self, request, origin_request=True):
if self.request is None:
return request
scheme = request.get_type()
original_scheme = self.request.get_type()
if scheme not in ["http", "https"]:
return request
if not origin_request and not self.request.has_header("Referer"):
return request
if (self._handle_referer and original_scheme in ["http", "https"] and
not (original_scheme == "https" and scheme != "https")):
# strip URL fragment (RFC 2616 14.36)
parts = _rfc3986.urlsplit(self.request.get_full_url())
parts = parts[:-1] + (None, )
referer = _rfc3986.urlunsplit(parts)
request.add_unredirected_header("Referer", referer)
return request
def get_selector(self):
scheme, authority, path, query, fragment = _rfc3986.urlsplit(
self.__r_host)
if path == "":
path = "/" # RFC 2616, section 3.2.2
fragment = None # RFC 3986, section 3.5
return _rfc3986.urlunsplit([scheme, authority, path, query, fragment])
def _add_referer_header(self, request, origin_request=True):
if self.request is None:
return request
scheme = request.get_type()
original_scheme = self.request.get_type()
if scheme not in ["http", "https"]:
return request
if not origin_request and not self.request.has_header("Referer"):
return request
if (self._handle_referer and original_scheme in ["http", "https"] and
not (original_scheme == "https" and scheme != "https")):
# strip URL fragment (RFC 2616 14.36)
parts = _rfc3986.urlsplit(self.request.get_full_url())
parts = parts[:-1] + (None, )
referer = _rfc3986.urlunsplit(parts)
request.add_unredirected_header("Referer", referer)
return request
def http_error_302(self, req, fp, code, msg, headers):
# Some servers (incorrectly) return multiple Location headers
# (so probably same goes for URI). Use first header.
if 'location' in headers:
newurl = headers.getheaders('location')[0]
elif 'uri' in headers:
newurl = headers.getheaders('uri')[0]
else:
return
newurl = _rfc3986.clean_url(newurl)
newurl = _rfc3986.urljoin(req.get_full_url(), newurl)
# XXX Probably want to forget about the state of the current
# request, although that might interact poorly with other
# handlers that also use handler-specific request attributes
new = self.redirect_request(req, fp, code, msg, headers, newurl)
if new is None:
return
# loop detection
# .redirect_dict has a key url if url was previously visited.
if hasattr(req, 'redirect_dict'):
visited = new.redirect_dict = req.redirect_dict
if (visited.get(newurl, 0) >= self.max_repeats or
len(visited) >= self.max_redirections):
raise HTTPError(req.get_full_url(), code,
def http_error_302(self, req, fp, code, msg, headers):
# Some servers (incorrectly) return multiple Location headers
# (so probably same goes for URI). Use first header.
if 'location' in headers:
newurl = headers.getheaders('location')[0]
elif 'uri' in headers:
newurl = headers.getheaders('uri')[0]
else:
return
newurl = _rfc3986.clean_url(newurl)
newurl = _rfc3986.urljoin(req.get_full_url(), newurl)
# XXX Probably want to forget about the state of the current
# request, although that might interact poorly with other
# handlers that also use handler-specific request attributes
new = self.redirect_request(req, fp, code, msg, headers, newurl)
if new is None:
return
# loop detection
# .redirect_dict has a key url if url was previously visited.
if hasattr(req, 'redirect_dict'):
visited = new.redirect_dict = req.redirect_dict
if (visited.get(newurl, 0) >= self.max_repeats or
len(visited) >= self.max_redirections):
raise HTTPError(req.get_full_url(), code,
def clean_refresh_url(url):
# e.g. Firefox 1.5 does (something like) this
if ((url.startswith('"') and url.endswith('"')) or
(url.startswith("'") and url.endswith("'"))):
url = url[1:-1]
return _rfc3986.clean_url(url, 'utf-8') # XXX encoding
def __init__(self, url, data=None, headers={},
origin_req_host=None, unverifiable=False, visit=None,
timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT,
method=None):
# In mechanize 0.2, the interpretation of a unicode url argument will
# change: A unicode url argument will be interpreted as an IRI, and a
# bytestring as a URI. For now, we accept unicode or bytestring. We
# don't insist that the value is always a URI (specifically, must only
# contain characters which are legal), because that might break working
# code (who knows what bytes some servers want to see, especially with
# browser plugins for internationalised URIs).
if not _rfc3986.is_clean_uri(url):
warn("url argument is not a URI "
"(contains illegal characters) %r" % url)
if isinstance(data, dict):
data = {as_utf8(k): as_utf8(v) for k, v in iteritems(data)}
data = urlencode(data)
data = data or None
if data and method == 'GET':
url += ('&' if '?' in url else '?') + data
data = None
_urllib2_fork.Request.__init__(self, url, data, headers, method=method)
self.selector = None
self.visit = visit
self.timeout = timeout