Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if os.environ.get("SANDBOX_ON") == "1":
# Make api consumers exempt from sandbox violations
# when doing metadata cache updates.
sandbox_write = os.environ.get("SANDBOX_WRITE", "").split(":")
if self.depcachedir not in sandbox_write:
sandbox_write.append(self.depcachedir)
os.environ["SANDBOX_WRITE"] = \
":".join(filter(None, sandbox_write))
self.porttrees = list(self.settings.repositories.repoLocationList())
# This is used as sanity check for aux_get(). If there is no
# root eclass dir, we assume that PORTDIR is invalid or
# missing. This check allows aux_get() to detect a missing
# repository and return early by raising a KeyError.
self._have_root_eclass_dir = os.path.isdir(
os.path.join(self.settings.repositories.mainRepoLocation(), "eclass"))
#if the portdbapi is "frozen", then we assume that we can cache everything (that no updates to it are happening)
self.xcache = {}
self.frozen = 0
#Keep a list of repo names, sorted by priority (highest priority first).
self._ordered_repo_name_list = tuple(reversed(self.repositories.prepos_order))
self.auxdbmodule = self.settings.load_best_module("portdbapi.auxdbmodule")
self.auxdb = {}
self._pregen_auxdb = {}
# If the current user doesn't have depcachedir write permission,
# then the depcachedir cache is kept here read-only access.
self._ro_auxdb = {}
self._init_cache_dirs()
destdir = os.path.join(options.ED, "usr", "share", "doc",
options.PF.lstrip(os.sep), desttree.lstrip(os.sep),
options.doc_prefix.lstrip(os.sep), prefix).rstrip(os.sep)
if not os.path.exists(fullpath):
sys.stderr.write("!!! dohtml: %s does not exist\n" % fullpath)
return False
elif os.path.isfile(fullpath):
ext = os.path.splitext(basename)[1][1:]
if ext in options.allowed_exts or basename in options.allowed_files:
dodir(destdir)
dofile(fullpath, os.path.join(destdir, basename))
elif warn_on_skipped_files and ext not in unwarned_skipped_extensions and basename not in unwarned_skipped_files:
skipped_files.append(fullpath)
elif options.recurse and os.path.isdir(fullpath) and \
basename not in options.disallowed_dirs:
for i in _os.listdir(_unicode_encode(fullpath)):
try:
i = _unicode_decode(i, errors='strict')
except UnicodeDecodeError:
writemsg('dohtml: argument is not encoded as UTF-8: %s\n' %
_unicode_decode(i), noiselevel=-1)
sys.exit(1)
pfx = basename
if prefix:
pfx = os.path.join(prefix, pfx)
install(i, dirname, options, pfx)
elif not options.recurse and os.path.isdir(fullpath):
global skipped_directories
skipped_directories.append(fullpath)
return False
# Save then clear flags on dest.
dflags = mydstat.st_flags
if dflags != 0:
bsd_chflags.lchflags(mydest, 0)
if not os.access(mydest, os.W_OK):
pkgstuff = pkgsplit(self.pkg)
writemsg(_("\n!!! Cannot write to '%s'.\n") % mydest, noiselevel=-1)
writemsg(_("!!! Please check permissions and directories for broken symlinks.\n"))
writemsg(_("!!! You may start the merge process again by using ebuild:\n"))
writemsg("!!! ebuild "+self.settings["PORTDIR"]+"/"+self.cat+"/"+pkgstuff[0]+"/"+self.pkg+".ebuild merge\n")
writemsg(_("!!! And finish by running this: env-update\n\n"))
return 1
if stat.S_ISDIR(mydmode) or \
(stat.S_ISLNK(mydmode) and os.path.isdir(mydest)):
# a symlink to an existing directory will work for us; keep it:
showMessage("--- %s/\n" % mydest)
if bsd_chflags:
bsd_chflags.lchflags(mydest, dflags)
else:
# a non-directory and non-symlink-to-directory. Won't work for us. Move out of the way.
backup_dest = self._new_backup_path(mydest)
msg = []
msg.append("")
msg.append("Installation of a directory is blocked by a file:")
msg.append(" '%s'" % mydest)
msg.append("This file will be renamed to a different name:")
msg.append(" '%s'" % backup_dest)
msg.append("")
self._eerror("preinst", msg)
if movefile(mydest, backup_dest,
def dodir(path):
try:
os.makedirs(path, 0o755)
except OSError:
if not os.path.isdir(path):
raise
os.chmod(path, 0o755)
for myfile, uris in filedict.items():
filedict[myfile] = primaryuri_dict.get(myfile, []) + uris
else:
for myfile in filedict:
filedict[myfile] += primaryuri_dict.get(myfile, [])
can_fetch=True
if listonly:
can_fetch = False
if can_fetch and not fetch_to_ro:
try:
_ensure_distdir(mysettings, mysettings["DISTDIR"])
except PortageException as e:
if not os.path.isdir(mysettings["DISTDIR"]):
writemsg("!!! %s\n" % str(e), noiselevel=-1)
writemsg(_("!!! Directory Not Found: DISTDIR='%s'\n") % mysettings["DISTDIR"], noiselevel=-1)
writemsg(_("!!! Fetching will fail!\n"), noiselevel=-1)
if can_fetch and \
not fetch_to_ro and \
not os.access(mysettings["DISTDIR"], os.W_OK):
writemsg(_("!!! No write access to '%s'\n") % mysettings["DISTDIR"],
noiselevel=-1)
can_fetch = False
distdir_writable = can_fetch and not fetch_to_ro
failed_files = set()
restrict_fetch_msg = False
valid_hashes = set(get_valid_checksum_keys())
valid_hashes.discard("size")
This function's handling of EEXIST errors makes it useful for atomic
directory creation, in which multiple processes may be competing to
create the same directory.
"""
created_dir = False
try:
os.makedirs(dir_path)
created_dir = True
except OSError as oe:
func_call = "makedirs('%s')" % dir_path
if oe.errno in (errno.EEXIST,):
pass
else:
if os.path.isdir(dir_path):
# NOTE: DragonFly raises EPERM for makedir('/')
# and that is supposed to be ignored here.
# Also, sometimes mkdir raises EISDIR on FreeBSD
# and we want to ignore that too (bug #187518).
pass
elif oe.errno == errno.EPERM:
raise OperationNotPermitted(func_call)
elif oe.errno == errno.EACCES:
raise PermissionDenied(func_call)
elif oe.errno == errno.EROFS:
raise ReadOnlyFileSystem(func_call)
else:
raise
if kwargs:
perms_modified = apply_permissions(dir_path, **kwargs)
else:
try:
directory = _unicode_decode(directory,
encoding=_encodings['fs'], errors='strict')
# Now verify that we can also encode it.
_unicode_encode(directory,
encoding=_encodings['fs'], errors='strict')
except UnicodeError:
directory = _unicode_decode(directory,
encoding=_encodings['fs'], errors='replace')
raise SetConfigError(
_("Directory path contains invalid character(s) for encoding '%s': '%s'") \
% (_encodings['fs'], directory))
vcs_dirs = [_unicode_encode(x, encoding=_encodings['fs']) for x in VCS_DIRS]
if os.path.isdir(directory):
directory = normalize_path(directory)
for parent, dirs, files in os.walk(directory):
try:
parent = _unicode_decode(parent,
encoding=_encodings['fs'], errors='strict')
except UnicodeDecodeError:
continue
for d in dirs[:]:
if d in vcs_dirs or d.startswith(b".") or d.endswith(b"~"):
dirs.remove(d)
for filename in files:
try:
filename = _unicode_decode(filename,
encoding=_encodings['fs'], errors='strict')
except UnicodeDecodeError:
f.close()
if intersecting_repos:
allow_directories = eapi_allows_directories_on_profile_level_and_repository_level(eapi) or \
any(x in _portage1_profiles_allow_directories for x in layout_data['profile-formats'])
compat_mode = not eapi_allows_directories_on_profile_level_and_repository_level(eapi) and \
layout_data['profile-formats'] == ('portage-1-compat',)
allow_parent_colon = any(x in _allow_parent_colon
for x in layout_data['profile-formats'])
current_formats = tuple(layout_data['profile-formats'])
if compat_mode:
offenders = _PORTAGE1_DIRECTORIES.intersection(os.listdir(currentPath))
offenders = sorted(x for x in offenders
if os.path.isdir(os.path.join(currentPath, x)))
if offenders:
warnings.warn(_(
"\nThe selected profile is implicitly using the 'portage-1' format:\n"
"\tprofile = %(profile_path)s\n"
"But this repository is not using that format:\n"
"\trepo = %(repo_name)s\n"
"This will break in the future. Please convert these dirs to files:\n"
"\t%(files)s\n"
"Or, add this line to the repository's layout.conf:\n"
"\tprofile-formats = portage-1")
% dict(profile_path=currentPath, repo_name=repo_loc,
files='\n\t'.join(offenders)))
parentsFile = os.path.join(currentPath, "parent")
if exists_raise_eaccess(parentsFile):
parents = grabfile(parentsFile)
if portage.util.noiselimit >= 0:
out.einfo("GNU info directory index is up-to-date.")
else:
portage.util.writemsg_stdout("\n")
if portage.util.noiselimit >= 0:
out.einfo("Regenerating GNU info directory index...")
dir_extensions = ("", ".gz", ".bz2")
icount = 0
badcount = 0
errmsg = ""
for inforoot in regen_infodirs:
if inforoot == '':
continue
if not os.path.isdir(inforoot) or \
not os.access(inforoot, os.W_OK):
continue
file_list = os.listdir(inforoot)
file_list.sort()
dir_file = os.path.join(inforoot, "dir")
moved_old_dir = False
processed_count = 0
for x in file_list:
if x.startswith(".") or \
os.path.isdir(os.path.join(inforoot, x)):
continue
if x.startswith("dir"):
skip = False
for ext in dir_extensions:
if x == "dir" + ext or \
if options.failure_log is not None:
options.failure_log = normalize_path(
os.path.abspath(options.failure_log))
parent_dir = os.path.dirname(options.failure_log)
if not (os.path.isdir(parent_dir) and
os.access(parent_dir, os.W_OK|os.X_OK)):
parser.error(("--failure-log '%s' parent is not a "
"writable directory") % options.failure_log)
if options.success_log is not None:
options.success_log = normalize_path(
os.path.abspath(options.success_log))
parent_dir = os.path.dirname(options.success_log)
if not (os.path.isdir(parent_dir) and
os.access(parent_dir, os.W_OK|os.X_OK)):
parser.error(("--success-log '%s' parent is not a "
"writable directory") % options.success_log)
if options.scheduled_deletion_log is not None:
options.scheduled_deletion_log = normalize_path(
os.path.abspath(options.scheduled_deletion_log))
parent_dir = os.path.dirname(options.scheduled_deletion_log)
if not (os.path.isdir(parent_dir) and
os.access(parent_dir, os.W_OK|os.X_OK)):
parser.error(("--scheduled-deletion-log '%s' parent is not a "
"writable directory") % options.scheduled_deletion_log)
if options.deletion_db is None:
parser.error("--scheduled-deletion-log requires --deletion-db")