Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
content_type = _get_content_type(url, session=session)
if content_type.lower().startswith('text/html'):
break
else:
logger.debug(
'Skipping page %s because of Content-Type: %s',
link,
content_type,
)
return
logger.debug('Getting page %s', url)
# Tack index.html onto file:// URLs that point to directories
(scheme, netloc, path, params, query, fragment) = \
urllib_parse.urlparse(url)
if (scheme == 'file' and
os.path.isdir(urllib_request.url2pathname(path))):
# add trailing slash if not present so urljoin doesn't trim
# final segment
if not url.endswith('/'):
url += '/'
url = urllib_parse.urljoin(url, 'index.html')
logger.debug(' file: URL is directory, getting %s', url)
resp = session.get(
url,
headers={
"Accept": "text/html",
# We don't want to blindly returned cached data for
# /simple/, because authors generally expecting that
# twine upload && pip install will function, but if
def _validate_secure_origin(self, logger, location):
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin = (parsed.scheme, parsed.hostname, parsed.port)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
protocol = origin[0].rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in (SECURE_ORIGINS + self.secure_origins):
if protocol != secure_origin[0] and secure_origin[0] != "*":
continue
try:
def _validate_secure_origin(self, logger, location):
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin = (parsed.scheme, parsed.hostname, parsed.port)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
protocol = origin[0].rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in (SECURE_ORIGINS + self.secure_origins):
if protocol != secure_origin[0] and secure_origin[0] != "*":
continue
try:
def _validate_secure_origin(self, logger, location):
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin = (parsed.scheme, parsed.hostname, parsed.port)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
protocol = origin[0].rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in (SECURE_ORIGINS + self.secure_origins):
if protocol != secure_origin[0] and secure_origin[0] != "*":
continue
try:
def _validate_secure_origin(self, logger, location):
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin = (parsed.scheme, parsed.hostname, parsed.port)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
protocol = origin[0].rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in (SECURE_ORIGINS + self.secure_origins):
if protocol != secure_origin[0] and secure_origin[0] != "*":
continue
try:
def _remove_md5_fragment(location):
if not location:
return ''
parsed = urllib.parse.urlparse(location)
if parsed[-1].startswith('md5='):
return urllib.parse.urlunparse(parsed[:-1] + ('',))
return location
def _clean_link(url):
# type: (str) -> str
"""Makes sure a link is fully encoded. That is, if a ' ' shows up in
the link, it will be rewritten to %20 (while not over-quoting
% or other characters)."""
# Split the URL into parts according to the general structure
# `scheme://netloc/path;parameters?query#fragment`. Note that the
# `netloc` can be empty and the URI will then refer to a local
# filesystem path.
result = urllib_parse.urlparse(url)
# In both cases below we unquote prior to quoting to make sure
# nothing is double quoted.
if result.netloc == "":
# On Windows the path part might contain a drive letter which
# should not be quoted. On Linux where drive letters do not
# exist, the colon should be quoted. We rely on urllib.request
# to do the right thing here.
path = urllib_request.pathname2url(
urllib_request.url2pathname(result.path))
else:
# In addition to the `/` character we protect `@` so that
# revision strings in VCS URLs are properly parsed.
path = urllib_parse.quote(urllib_parse.unquote(result.path), safe="/@")
return urllib_parse.urlunparse(result._replace(path=path))
def handle_401(self, resp, **kwargs):
# We only care about 401 responses, anything else we want to just
# pass through the actual response
if resp.status_code != 401:
return resp
# We are not able to prompt the user so simple return the response
if not self.prompting:
return resp
parsed = urllib_parse.urlparse(resp.url)
# Prompt the user for a new username and password
username = six.moves.input("User for %s: " % parsed.netloc)
password = getpass.getpass("Password: ")
# Store the new username and password to use for future requests
if username or password:
self.passwords[parsed.netloc] = (username, password)
# Consume content and release the original connection to allow our new
# request to reuse the same one.
resp.content
resp.raw.release_conn()
# Add our new username and password to the request
req = HTTPBasicAuth(username or "", password or "")(resp.request)
def _clean_link(url):
# type: (str) -> str
"""Makes sure a link is fully encoded. That is, if a ' ' shows up in
the link, it will be rewritten to %20 (while not over-quoting
% or other characters)."""
# Split the URL into parts according to the general structure
# `scheme://netloc/path;parameters?query#fragment`. Note that the
# `netloc` can be empty and the URI will then refer to a local
# filesystem path.
result = urllib_parse.urlparse(url)
# In both cases below we unquote prior to quoting to make sure
# nothing is double quoted.
if result.netloc == "":
# On Windows the path part might contain a drive letter which
# should not be quoted. On Linux where drive letters do not
# exist, the colon should be quoted. We rely on urllib.request
# to do the right thing here.
path = urllib_request.pathname2url(
urllib_request.url2pathname(result.path))
else:
# In addition to the `/` character we protect `@` so that
# revision strings in VCS URLs are properly parsed.
path = urllib_parse.quote(urllib_parse.unquote(result.path), safe="/@")
return urllib_parse.urlunparse(result._replace(path=path))
return
# This is a dirty hack to prevent installing Binary Wheels from
# PyPI unless it is a Windows or Mac Binary Wheel. This is
# paired with a change to PyPI disabling uploads for the
# same. Once we have a mechanism for enabling support for
# binary wheels on linux that deals with the inherent problems
# of binary distribution this can be removed.
comes_from = getattr(link, "comes_from", None)
if (
(
not platform.startswith('win') and not
platform.startswith('macosx') and not
platform == 'cli'
) and
comes_from is not None and
urllib_parse.urlparse(
comes_from.url
).netloc.endswith(PyPI.netloc)):
if not wheel.supported(tags=supported_tags_noarch):
self._log_skipped_link(
link,
"it is a pypi-hosted binary "
"Wheel on an unsupported platform",
)
return
version = wheel.version
# This should be up by the search.ok_binary check, but see issue 2700.
if "source" not in search.formats and ext != wheel_ext:
self._log_skipped_link(
link, 'No sources permitted for %s' % search.supplied)
return