Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_url_credentials(base_url, other_url):
base = compat.urlparse(base_url)
other = compat.urlparse(other_url)
# If they're not from the same server, we return right away without
# trying to update anything
if base.hostname != other.hostname or base.port != other.port:
return other.geturl()
# Update the `netloc` field and return the `other` url
return other._replace(netloc=base.netloc).geturl()
def get_opener():
http_proxy = os.getenv('http_proxy')
if http_proxy:
parsed_url = compat.urlparse(http_proxy)
proxy_headers = util.get_auth_info_from_url(
http_proxy, proxy=True)
return urllib3.ProxyManager(
proxy_url=parsed_url.geturl(),
proxy_headers=proxy_headers)
return urllib3.PoolManager()
def get_page(self, url):
# http://peak.telecommunity.com/DevCenter/EasyInstall#package-index-api
scheme, netloc, path, _, _, _ = compat.urlparse(url)
if scheme == 'file' and os.path.isdir(url2pathname(path)):
url = urljoin(ensure_slash(url), 'index.html')
# The `retrieve()` method follows any eventual redirects, so the
# initial url might be different from the final one
try:
response, final_url = self.opener.retrieve(url)
except urllib3.exceptions.MaxRetryError:
return
content_type = response.headers.get('content-type', '')
if locators.HTML_CONTENT_TYPE.match(content_type):
data = response.data
encoding = response.headers.get('content-encoding')
if encoding:
decoder = self.decoders[encoding] # fail if not found
def get_page(self, url):
# http://peak.telecommunity.com/DevCenter/EasyInstall#package-index-api
scheme, netloc, path, _, _, _ = compat.urlparse(url)
if scheme == 'file' and os.path.isdir(url2pathname(path)):
url = compat.urljoin(ensure_slash(url), 'index.html')
# The `retrieve()` method follows any eventual redirects, so the
# initial url might be different from the final one
try:
response, final_url = http_retrieve(self.opener, url)
except urllib3.exceptions.MaxRetryError:
return
content_type = response.headers.get('content-type', '')
if locators.HTML_CONTENT_TYPE.match(content_type):
data = response.data
encoding = response.headers.get('content-encoding')
if encoding:
decoder = self.decoders[encoding] # fail if not found
data = decoder(data)
encoding = 'utf-8'
def handle(self, requester, data):
# Preparing the url to PUT the file
wheel = data.get('wheel')
server = data.get('server')
file_name = os.path.basename(wheel)
url = compat.urljoin(server, 'p/{0}'.format(file_name))
# Sending the file to the server. Both `method` and `url` parameters
# for calling `request_encode_body()` must be `str()` instances, not
# unicode.
contents = io.open(wheel, 'rb').read()
self.opener.request_encode_body(
b'PUT', bytes(url), {file_name: (file_name, contents)},
headers=get_auth_info_from_url(url))
return {'upload_url': url, 'requirement': data['requirement']}
def _download_http(self, url):
response, final_url = http_retrieve(self.opener, url)
if final_url:
url = final_url
if response.status != 200:
raise ReportableError(
'Failed to download url `{0}\': {1} ({2})'.format(
url,
response.status,
compat.httplib.responses[response.status],
))
# Define what kind of package we've got
field_name = 'wheel' if url.endswith('.whl') else 'tarball'
# Now that we're sure that our request was successful
header = response.headers.get('content-disposition', '')
file_name = re.findall(r'filename=\"?([^;\"]+)', header)
return field_name, self.index.from_data(
file_name and file_name[0] or url,
response.read(cache_content=True, decode_content=False))
def _get_project(self, name):
# It sounds lame, but we're trying to match requirements with more than
# one word separated with either `_` or `-`. Notice that we prefer
# hyphens cause there is currently way more packages using hyphens than
# underscores in pypi.p.o. Let's wait for the best here.
options = [name]
if '-' in name or '_' in name:
options = (name.replace('_', '-'), name.replace('-', '_'))
# Iterate over all the possible names a package can have.
for package_name in options:
url = compat.urljoin(self.base_url, '{0}/'.format(
compat.quote(package_name)))
found = self._fetch(url, package_name)
if found:
return found
def _get_project(self, name):
return self._fetch(
urljoin(self.base_url, '%s/' % compat.quote(name)),
name)
def get_auth_info_from_url(url, proxy=False):
parsed = compat.urlparse(url)
if parsed.username:
auth = '{0}:{1}'.format(parsed.username, parsed.password)
# The caller is not interested in proxy headers
if not proxy:
return urllib3.util.make_headers(basic_auth=auth)
# Proxy-Authentication support
return {'proxy-authorization':
'Basic ' + b64encode(auth.encode('utf-8')).decode('ascii')}
return {}