Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def gen_label(base, label):
"""if supplied label is a path, generate a unique label based upon label, and supplied base path"""
if label.find(os.path.sep) == -1:
return label
label = label.strip("\"").strip("'")
label = os.path.join(*(label.rstrip(os.path.sep).split(os.path.sep)))
tail = os.path.split(label)[1]
return "%s-%X" % (tail, abs(label.__hash__()))
if x.startswith(".") or x.endswith("~") or x.endswith(".bak"):
continue
templist.append(x)
fns = templist
del templist
space_separated = set(["CONFIG_PROTECT", "CONFIG_PROTECT_MASK"])
colon_separated = set(["ADA_INCLUDE_PATH", "ADA_OBJECTS_PATH",
"CLASSPATH", "INFODIR", "INFOPATH", "KDEDIRS", "LDPATH", "MANPATH",
"PATH", "PKG_CONFIG_PATH", "PRELINK_PATH", "PRELINK_PATH_MASK",
"PYTHONPATH", "ROOTPATH"])
config_list = []
for x in fns:
file_path = os.path.join(envd_dir, x)
try:
myconfig = getconfig(file_path, expand=False)
except ParseError as e:
writemsg("!!! '%s'\n" % str(e), noiselevel=-1)
del e
continue
if myconfig is None:
# broken symlink or file removed by a concurrent process
writemsg("!!! File Not Found: '%s'\n" % file_path, noiselevel=-1)
continue
config_list.append(myconfig)
if "SPACE_SEPARATED" in myconfig:
space_separated.update(myconfig["SPACE_SEPARATED"].split())
del myconfig["SPACE_SEPARATED"]
if "COLON_SEPARATED" in myconfig:
dodir(destdir)
dofile(fullpath, os.path.join(destdir, basename))
elif warn_on_skipped_files and ext not in unwarned_skipped_extensions and basename not in unwarned_skipped_files:
skipped_files.append(fullpath)
elif options.recurse and os.path.isdir(fullpath) and \
basename not in options.disallowed_dirs:
for i in _os.listdir(_unicode_encode(fullpath)):
try:
i = _unicode_decode(i, errors='strict')
except UnicodeDecodeError:
writemsg('dohtml: argument is not encoded as UTF-8: %s\n' %
_unicode_decode(i), noiselevel=-1)
sys.exit(1)
pfx = basename
if prefix:
pfx = os.path.join(prefix, pfx)
install(i, dirname, options, pfx)
elif not options.recurse and os.path.isdir(fullpath):
global skipped_directories
skipped_directories.append(fullpath)
return False
else:
return False
return True
inst_gid = int(mysettings["PORTAGE_INST_GID"])
_preinst_bsdflags(mysettings)
destdir = mysettings["D"]
ed_len = len(mysettings["ED"])
unicode_errors = []
desktop_file_validate = \
portage.process.find_binary("desktop-file-validate") is not None
xdg_dirs = mysettings.get('XDG_DATA_DIRS', '/usr/share').split(':')
xdg_dirs = tuple(os.path.join(i, "applications") + os.sep
for i in xdg_dirs if i)
qa_desktop_file = ""
try:
with io.open(_unicode_encode(os.path.join(
mysettings["PORTAGE_BUILDDIR"],
"build-info", "QA_DESKTOP_FILE"),
encoding=_encodings['fs'], errors='strict'),
mode='r', encoding=_encodings['repo.content'],
errors='replace') as f:
qa_desktop_file = f.read()
except IOError as e:
if e.errno not in (errno.ENOENT, errno.ESTALE):
raise
qa_desktop_file = qa_desktop_file.split()
if qa_desktop_file:
if len(qa_desktop_file) > 1:
qa_desktop_file = "|".join("(%s)" % x for x in qa_desktop_file)
qa_desktop_file = "^(%s)$" % qa_desktop_file
else:
def scan_pkgs(self, can_force):
for xpkg in self.effective_scanlist:
xpkg_continue = False
# ebuilds and digests added to cvs respectively.
logging.info("checking package %s", xpkg)
# save memory by discarding xmatch caches from previous package(s)
self.caches['arch_xmatch'].clear()
catdir, pkgdir = xpkg.split("/")
checkdir = self.repo_settings.repodir + "/" + xpkg
checkdir_relative = ""
if self.repolevel < 3:
checkdir_relative = os.path.join(pkgdir, checkdir_relative)
if self.repolevel < 2:
checkdir_relative = os.path.join(catdir, checkdir_relative)
checkdir_relative = os.path.join(".", checkdir_relative)
# Run the status check
if self.kwargs['checks']['ebuild_notadded']:
self.vcs_settings.status.check(checkdir, checkdir_relative, xpkg)
if self.generate_manifest:
if not manifest.Manifest(**self.kwargs).update_manifest(checkdir):
self.qatracker.add_error("manifest.bad", os.path.join(xpkg, 'Manifest'))
if self.options.mode == 'manifest':
continue
checkdirlist = os.listdir(checkdir)
dynamic_data = {
'changelog_modified': False,
'checkdirlist': ExtendedFuture(checkdirlist),
'checkdir': checkdir,
def _chpathtool_exit(self, chpathtool):
if self._final_exit(chpathtool) != os.EX_OK:
self._writemsg_level("!!! Error Adjusting Prefix to %s\n" %
(self.settings["EPREFIX"],),
noiselevel=-1, level=logging.ERROR)
self._async_unlock_builddir(returncode=self.returncode)
return
# We want to install in "our" prefix, not the binary one
with io.open(_unicode_encode(os.path.join(self._infloc, "EPREFIX"),
encoding=_encodings['fs'], errors='strict'), mode='w',
encoding=_encodings['repo.content'], errors='strict') as f:
f.write(self.settings["EPREFIX"] + "\n")
# Move the files to the correct location for merge.
image_tmp_dir = os.path.join(
self.settings["PORTAGE_BUILDDIR"], "image_tmp")
build_d = os.path.join(self.settings["D"],
self._build_prefix.lstrip(os.sep))
if not os.path.isdir(build_d):
# Assume this is a virtual package or something.
shutil.rmtree(self._image_dir)
ensure_dirs(self.settings["ED"])
else:
os.rename(build_d, image_tmp_dir)
shutil.rmtree(self._image_dir)
ensure_dirs(os.path.dirname(self.settings["ED"].rstrip(os.sep)))
os.rename(image_tmp_dir, self.settings["ED"])
self.wait()
This is called in the parent process, serially, for each of the
sync jobs when they complete. Some cache backends such as sqlite
may require that cache access be performed serially in the
parent process like this.
"""
repo = proc.kwargs['repo']
exitcode = proc.returncode
updatecache_flg = False
if proc.returncode == os.EX_OK:
exitcode, message, updatecache_flg, hooks_enabled = proc.result
if updatecache_flg and "metadata-transfer" not in self.settings.features:
updatecache_flg = False
if updatecache_flg and \
os.path.exists(os.path.join(
repo.location, 'metadata', 'md5-cache')):
# Only update cache for repo.location since that's
# the only one that's been synced here.
action_metadata(self.settings, self.portdb, self.emerge_config.opts,
porttrees=[repo.location])
def _set_paths(self, **kwargs):
repolevel = kwargs.get('repolevel')
self.relative_path = os.path.join(self.xpkg, self.y_ebuild + ".ebuild")
self.full_path = os.path.join(self.repo_settings.repodir, self.relative_path)
self.ebuild_path = self.y_ebuild + ".ebuild"
if repolevel < 3:
self.ebuild_path = os.path.join(kwargs.get('pkgdir'), self.ebuild_path)
if repolevel < 2:
self.ebuild_path = os.path.join(kwargs.get('catdir'), self.ebuild_path)
self.ebuild_path = os.path.join(".", self.ebuild_path)
for x in portage_bin_path:
path.append(os.path.join(x, "ebuild-helpers", "xattr"))
if uid != 0 and \
"unprivileged" in settings.features and \
"fakeroot" not in settings.features:
for x in portage_bin_path:
path.append(os.path.join(x,
"ebuild-helpers", "unprivileged"))
if settings.get("USERLAND", "GNU") != "GNU":
for x in portage_bin_path:
path.append(os.path.join(x, "ebuild-helpers", "bsd"))
for x in portage_bin_path:
path.append(os.path.join(x, "ebuild-helpers"))
path.extend(prerootpath)
for prefix in prefixes:
prefix = prefix if prefix else "/"
for x in ("usr/local/sbin", "usr/local/bin", "usr/sbin", "usr/bin", "sbin", "bin"):
# Respect order defined in ROOTPATH
x_abs = os.path.join(prefix, x)
if x_abs not in rootpath_set:
path.append(x_abs)
path.extend(rootpath)
settings["PATH"] = ":".join(path)
def _parse_profile_files_to_tuple_of_dicts(self, file_name, locations,
juststrings=False, eapi_filter=None):
return tuple(self._parse_file_to_dict(
os.path.join(profile.location, file_name), juststrings,
recursive=profile.portage1_directories, eapi_filter=eapi_filter,
user_config=profile.user_config, eapi=profile.eapi,
eapi_default=None, allow_build_id=profile.allow_build_id)
for profile in locations)