Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def clobber(source, dest, is_base, fixer=None, filter=None):
ensure_dir(dest) # common for the 'include' path
for dir, subdirs, files in os.walk(source):
basedir = dir[len(source):].lstrip(os.path.sep)
destdir = os.path.join(dest, basedir)
if is_base and basedir.split(os.path.sep, 1)[0].endswith('.data'):
continue
for s in subdirs:
destsubdir = os.path.join(dest, basedir, s)
if is_base and basedir == '' and destsubdir.endswith('.data'):
data_dirs.append(s)
continue
elif (is_base and
s.endswith('.dist-info') and
canonicalize_name(s).startswith(
canonicalize_name(req.name))):
assert not info_dir, ('Multiple .dist-info directories: ' +
destsubdir + ', ' +
', '.join(info_dir))
info_dir.append(destsubdir)
for f in files:
# Skip unwanted files
if filter and filter(f):
continue
srcfile = os.path.join(dir, f)
destfile = os.path.join(dest, basedir, f)
# directory creation is lazy and after the file filtering above
# to ensure we don't install empty dirs; empty dirs can't be
# uninstalled.
ensure_dir(destdir)
ensure_dir(dest) # common for the 'include' path
for dir, subdirs, files in os.walk(source):
basedir = dir[len(source):].lstrip(os.path.sep)
destdir = os.path.join(dest, basedir)
if is_base and basedir.split(os.path.sep, 1)[0].endswith('.data'):
continue
for s in subdirs:
destsubdir = os.path.join(dest, basedir, s)
if is_base and basedir == '' and destsubdir.endswith('.data'):
data_dirs.append(s)
continue
elif (is_base and
s.endswith('.dist-info') and
canonicalize_name(s).startswith(
canonicalize_name(req.name))):
assert not info_dir, ('Multiple .dist-info directories: ' +
destsubdir + ', ' +
', '.join(info_dir))
info_dir.append(destsubdir)
for f in files:
# Skip unwanted files
if filter and filter(f):
continue
srcfile = os.path.join(dir, f)
destfile = os.path.join(dest, basedir, f)
# directory creation is lazy and after the file filtering above
# to ensure we don't install empty dirs; empty dirs can't be
# uninstalled.
ensure_dir(destdir)
# copyfile (called below) truncates the destination if it
def handle_mutual_excludes(value, target, other):
# type: (str, Optional[Set], Optional[Set]) -> None
new = value.split(',')
while ':all:' in new:
other.clear()
target.clear()
target.add(':all:')
del new[:new.index(':all:') + 1]
# Without a none, we want to discard everything as :all: covers it
if ':none:' not in new:
return
for name in new:
if name == ':none:':
target.clear()
continue
name = canonicalize_name(name)
other.discard(name)
target.add(name)
logger.info(
'Skipping %s, due to already being wheel.', req.name,
)
return None
if not autobuilding:
return False
if req.editable or not req.source_dir:
return None
if req.link and not req.link.is_artifact:
# VCS checkout. Build wheel just for this run.
return True
if "binary" not in format_control.get_allowed_formats(
canonicalize_name(req.name)):
logger.info(
"Skipping bdist_wheel for %s, due to binaries "
"being disabled for it.", req.name,
)
return None
link = req.link
base, ext = link.splitext()
if cache_available and _contains_egg_info(base):
return False
# Otherwise, build the wheel just for this run using the ephemeral
# cache since we are either in the case of e.g. a local directory, or
# no cache directory is available to use.
return True
# We want to filter out any thing which does not have a secure origin.
url_locations = [
link for link in itertools.chain(
(Link(url) for url in index_url_loc),
(Link(url) for url in fl_url_loc),
)
if self._validate_secure_origin(logger, link)
]
logger.debug('%d location(s) to search for versions of %s:',
len(url_locations), project_name)
for location in url_locations:
logger.debug('* %s', location)
canonical_name = canonicalize_name(project_name)
formats = self.format_control.get_allowed_formats(canonical_name)
search = Search(project_name, canonical_name, formats)
find_links_versions = self._package_versions(
# We trust every directly linked archive in find_links
(Link(url, '-f') for url in self.find_links),
search
)
page_versions = []
for page in self._get_pages(url_locations, project_name):
try:
logger.debug('Analyzing links from page %s', page.url)
except AttributeError:
continue
with indent_log():
page_versions.extend(
def _create_whitelist(would_be_installed, package_set):
# type: (Set[str], PackageSet) -> Set[str]
packages_affected = set(would_be_installed)
for package_name in package_set:
if package_name in packages_affected:
continue
for req in package_set[package_name].requires:
if canonicalize_name(req.name) in packages_affected:
packages_affected.add(package_name)
break
return packages_affected
This function is needed since the canonicalized name does not necessarily
have the same length as the egg info's name part. An example::
>>> egg_info = 'foo__bar-1.0'
>>> canonical_name = 'foo-bar'
>>> _find_name_version_sep(egg_info, canonical_name)
8
"""
# Project name and version must be separated by one single dash. Find all
# occurrences of dashes; if the string in front of it matches the canonical
# name, this is the one separating the name and version parts.
for i, c in enumerate(egg_info):
if c != "-":
continue
if canonicalize_name(egg_info[:i]) == canonical_name:
return i
raise ValueError("{} does not match {}".format(egg_info, canonical_name))
def mkurl_pypi_url(url):
loc = posixpath.join(
url,
urllib_parse.quote(canonicalize_name(project_name)))
# For maximum compatibility with easy_install, ensure the path
# ends in a trailing slash. Although this isn't in the spec
# (and PyPI can handle it without the slash) some other index
# implementations might break if they relied on easy_install's
# behavior.
if not loc.endswith('/'):
loc = loc + '/'
return loc
req_files[line_req.name].append(req_file_path)
# Warn about requirements that were included multiple times (in a
# single requirements file or in different requirements files).
for name, files in six.iteritems(req_files):
if len(files) > 1:
logger.warning("Requirement %s included multiple times [%s]",
name, ', '.join(sorted(set(files))))
yield(
'## The following requirements were added by '
'pip freeze:'
)
for installation in sorted(
installations.values(), key=lambda x: x.name.lower()):
if canonicalize_name(installation.name) not in skip:
yield str(installation).rstrip()
def _simulate_installation_of(to_install, package_set):
# type: (List[InstallRequirement], PackageSet) -> Set[str]
"""Computes the version of packages after installing to_install.
"""
# Keep track of packages that were installed
installed = set()
# Modify it as installing requirement_set would (assuming no errors)
for inst_req in to_install:
dist = make_abstract_dist(inst_req).dist()
name = canonicalize_name(dist.key)
package_set[name] = PackageDetails(dist.version, dist.requires())
installed.add(name)
return installed