Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
extra_amt = read_amt(zipped_file, 2)
extra_amt = ord(extra_amt[0]) + 256 * ord(extra_amt[1])
if extra_amt:
read_amt(zipped_file, extra_amt)
if flag & 8: # filename
while read_amt(zipped_file, 1) != b'\0':
continue
if flag & 16: # comment
while read_amt(zipped_file, 1) != b'\0':
continue
if flag & 2: # crc
read_amt(zipped_file, 2)
return UnzipWrapper(zipped_file)
class HTTPGzipProcessor(BaseHandler):
handler_order = 200 # response processing before HTTPEquivProcessor
def __init__(self, request_gzip=False):
self.request_gzip = request_gzip
def __copy__(self):
return self.__class__(self.request_gzip)
def http_request(self, request):
if self.request_gzip:
existing = [
x.strip()
for x in request.get_header('Accept-Encoding', '').split(',')
]
if sum('gzip' in x for x in existing) < 1:
existing.append('gzip')
auth = str('Basic %s' % base64.b64encode(
raw.encode('utf-8')).strip().decode('ascii'))
if req.headers.get(self.auth_header, None) == auth:
return None
newreq = copy.copy(req)
newreq.add_header(self.auth_header, auth)
newreq.visit = False
return self.parent.open(newreq)
else:
return None
def __copy__(self):
return self.__class__(self.passwd.__copy__())
class HTTPBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler):
auth_header = 'Authorization'
def http_error_401(self, req, fp, code, msg, headers):
url = req.get_full_url()
return self.http_error_auth_reqed('www-authenticate',
url, req, headers)
def __copy__(self):
return AbstractBasicAuthHandler.__copy__(self)
class ProxyBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler):
auth_header = 'Proxy-authorization'
def http_response(self, request, response):
code, msg, hdrs = response.code, response.msg, response.info()
# According to RFC 2616, "2xx" code indicates that the client's
# request was successfully received, understood, and accepted.
if not (200 <= code < 300):
# hardcoded http is NOT a bug
response = self.parent.error(
'http', request, response, code, msg, hdrs)
return response
https_response = http_response
class HTTPDefaultErrorHandler(BaseHandler):
def http_error_default(self, req, fp, code, msg, hdrs):
# why these error methods took the code, msg, headers args in the first
# place rather than a response object, I don't know, but to avoid
# multiple wrapping, we're discarding them
if isinstance(fp, HTTPError):
response = fp
else:
response = HTTPError(
req.get_full_url(), code, msg, hdrs, fp)
assert code == response.code
assert msg == response.msg
assert hdrs == response.hdrs
raise response
part = ''
continue
if cur == '"':
quote = True
part += cur
# append last part
if part:
res.append(part)
return list(filter(None, (part_.strip() for part_ in res)))
class FileHandler(BaseHandler):
# Use local file or FTP depending on form of URL
def file_open(self, req):
url = req.get_selector()
if url[:2] == '//' and url[2:3] != '/':
req.type = 'ftp'
return self.parent.open(req)
else:
return self.open_local_file(req)
# names for the localhost
names = None
def get_names(self):
if FileHandler.names is None:
try:
else:
conn_factory = partial(
HTTPSConnection, key_file=key_file,
cert_file=cert_file, context=self.ssl_context)
return self.do_open(conn_factory, req)
https_request = AbstractHTTPHandler.do_request_
def __copy__(self):
ans = self.__class__(self.client_cert_manager)
ans._debuglevel = self._debuglevel
ans.ssl_context = self.ssl_context
return ans
class HTTPCookieProcessor(BaseHandler):
"""Handle HTTP cookies.
Public attributes:
cookiejar: CookieJar instance
"""
def __init__(self, cookiejar=None):
if cookiejar is None:
cookiejar = CookieJar()
self.cookiejar = cookiejar
def http_request(self, request):
self.cookiejar.add_cookie_header(request)
return request
from __future__ import absolute_import
import logging
from ._response import response_seek_wrapper
from ._urllib2_fork import BaseHandler
class HTTPResponseDebugProcessor(BaseHandler):
handler_order = 900 # before redirections, after everything else
def http_response(self, request, response):
if not hasattr(response, "seek"):
response = response_seek_wrapper(response)
info = logging.getLogger("mechanize.http_responses").info
try:
info(response.read())
finally:
response.seek(0)
info("*****************************************************")
return response
https_response = http_response
if self.rfp.can_fetch(ua, request.get_full_url()):
return request
else:
# XXX This should really have raised URLError. Too late now...
factory = self.http_response_class or create_response_info
msg = b"request disallowed by robots.txt"
raise RobotExclusionError(
request,
request.get_full_url(),
403, msg,
factory(BytesIO()), BytesIO(msg))
https_request = http_request
class HTTPRefererProcessor(BaseHandler):
"""Add Referer header to requests.
This only makes sense if you use each RefererProcessor for a single
chain of requests only (so, for example, if you use a single
HTTPRefererProcessor to fetch a series of URLs extracted from a single
page, this will break).
There's a proper implementation of this in mechanize.Browser.
"""
def __init__(self):
self.referer = None
def http_request(self, request):
if ((self.referer is not None) and
# why these error methods took the code, msg, headers args in the first
# place rather than a response object, I don't know, but to avoid
# multiple wrapping, we're discarding them
if isinstance(fp, HTTPError):
response = fp
else:
response = HTTPError(
req.get_full_url(), code, msg, hdrs, fp)
assert code == response.code
assert msg == response.msg
assert hdrs == response.hdrs
raise response
class HTTPRedirectHandler(BaseHandler):
# maximum number of redirections to any single URL
# this is needed because of the state that cookies introduce
max_repeats = 4
# maximum total number of redirections (regardless of URL) before
# assuming we're in a loop
max_redirections = 10
# Implementation notes:
# To avoid the server sending us into an infinite loop, the request
# object needs to track what URLs we have already seen. Do this by
# adding a handler-specific attribute to the Request object. The value
# of the dict is used to count the number of times the same URL has
# been visited. This is needed because visiting the same URL twice
# does not necessarily imply a loop, thanks to state introduced by
# cookies.
class HTTPBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler):
auth_header = 'Authorization'
def http_error_401(self, req, fp, code, msg, headers):
url = req.get_full_url()
return self.http_error_auth_reqed('www-authenticate',
url, req, headers)
def __copy__(self):
return AbstractBasicAuthHandler.__copy__(self)
class ProxyBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler):
auth_header = 'Proxy-authorization'
def http_error_407(self, req, fp, code, msg, headers):
# http_error_auth_reqed requires that there is no userinfo component in
# authority. Assume there isn't one, since urllib2 does not (and
# should not, RFC 3986 s. 3.2.1) support requests for URLs containing
# userinfo.
authority = req.get_host()
return self.http_error_auth_reqed('proxy-authenticate',
authority, req, headers)
def __copy__(self):
return AbstractBasicAuthHandler.__copy__(self)
def http_response(self, request, response):
if not hasattr(response, "seek"):
response = response_seek_wrapper(response)
info = logging.getLogger("mechanize.http_responses").info
try:
info(response.read())
finally:
response.seek(0)
info("*****************************************************")
return response
https_response = http_response
class HTTPRedirectDebugProcessor(BaseHandler):
def http_request(self, request):
if hasattr(request, "redirect_dict"):
# info = logging.getLogger("mechanize.http_redirects").info
# info("redirecting to %s", request.get_full_url())
info = logging.getLogger("mechanize.http_redirects").info
info("redirecting from %s to %s", request.get_origin_req_host(), request.get_full_url())
return request