Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, *args, **config):
"""throws InitializationError if needs args aren't specified
gid and perms aren't listed do to an oddity python currying mechanism
gid=portage_gid
perms=0665"""
for x, y in (("gid", -1), ("perms", -1)):
if x in config:
setattr(self, "_"+x, config[x])
del config[x]
else:
setattr(self, "_"+x, y)
super(FsBased, self).__init__(*args, **config)
if self.label.startswith(os.path.sep):
# normpath.
self.label = os.path.sep + os.path.normpath(self.label).lstrip(os.path.sep)
def _get_dest_env_path(self):
return os.path.join(self.settings["T"], "environment")
def isprotected(self, obj):
"""Returns True if obj is protected, False otherwise. The caller must
ensure that obj is normalized with a single leading slash. A trailing
slash is optional for directories."""
masked = 0
protected = 0
sep = os.path.sep
if self.case_insensitive:
obj = obj.lower()
for ppath in self.protect:
if len(ppath) > masked and obj.startswith(ppath):
if ppath in self._dirs:
if obj != ppath and not obj.startswith(ppath + sep):
# /etc/foo does not match /etc/foobaz
continue
elif obj != ppath:
# force exact match when CONFIG_PROTECT lists a
# non-directory
continue
protected = len(ppath)
#config file management
for pmpath in self.protectmask:
if len(pmpath) >= protected and obj.startswith(pmpath):
skip = True
else:
skip = True
if skip:
result.set_result(fetch_tasks)
return
# Parse Manifest for this cp if we haven't yet.
try:
if digests_future.done():
# If there's an exception then raise it.
digests = digests_future.result()
else:
digests = repo_config.load_manifest(
os.path.join(repo_config.location, cpv.cp)).\
getTypeDigests("DIST")
except (EnvironmentError, PortageException) as e:
digests_future.done() or digests_future.set_exception(e)
for filename in uri_map:
config.log_failure(
"%s\t%s\tManifest exception %s" %
(cpv, filename, e))
config.file_failures[filename] = cpv
result.set_result(fetch_tasks)
return
else:
digests_future.done() or digests_future.set_result(digests)
if not digests:
for filename in uri_map:
config.log_failure("%s\t%s\tdigest entry missing" %
def getfetchsizes(self, mypkg, useflags=None, debug=0, myrepo=None):
# returns a filename:size dictionnary of remaining downloads
myebuild, mytree = self.findname2(mypkg, myrepo=myrepo)
if myebuild is None:
raise AssertionError(_("ebuild not found for '%s'") % mypkg)
pkgdir = os.path.dirname(myebuild)
mf = self.repositories.get_repo_for_location(
os.path.dirname(os.path.dirname(pkgdir))).load_manifest(
pkgdir, self.settings["DISTDIR"])
checksums = mf.getDigests()
if not checksums:
if debug:
writemsg(_("[empty/missing/bad digest]: %s\n") % (mypkg,))
return {}
filesdict={}
myfiles = self.getFetchMap(mypkg, useflags=useflags, mytree=mytree)
#XXX: maybe this should be improved: take partial downloads
# into account? check checksums?
for myfile in myfiles:
try:
fetch_size = int(checksums[myfile]["size"])
except (KeyError, ValueError):
def digest_regen(self, updates, removed, manifests, scanner, broken_changelog_manifests):
'''Regenerate manifests
@param updates: updated files
@param removed: removed files
@param manifests: Manifest files
@param scanner: The repoman.scanner.Scanner instance
@param broken_changelog_manifests: broken changelog manifests
'''
if broken_changelog_manifests:
for x in broken_changelog_manifests:
self.repoman_settings["O"] = os.path.join(self.repo_settings.repodir, x)
digestgen(mysettings=self.repoman_settings, myportdb=self.repo_settings.portdb)
if mysettings is None or myportdb is None:
raise TypeError("portage.digestgen(): 'mysettings' and 'myportdb' parameter are required.")
try:
portage._doebuild_manifest_exempt_depend += 1
distfiles_map = {}
fetchlist_dict = FetchlistDict(mysettings["O"], mysettings, myportdb)
for cpv in fetchlist_dict:
try:
for myfile in fetchlist_dict[cpv]:
distfiles_map.setdefault(myfile, []).append(cpv)
except InvalidDependString as e:
writemsg("!!! %s\n" % str(e), noiselevel=-1)
del e
return 0
mytree = os.path.dirname(os.path.dirname(mysettings["O"]))
try:
mf = mysettings.repositories.get_repo_for_location(mytree)
except KeyError:
# backward compatibility
mytree = os.path.realpath(mytree)
mf = mysettings.repositories.get_repo_for_location(mytree)
repo_required_hashes = mf.manifest_required_hashes
if repo_required_hashes is None:
repo_required_hashes = MANIFEST2_HASH_DEFAULTS
mf = mf.load_manifest(mysettings["O"], mysettings["DISTDIR"],
fetchlist_dict=fetchlist_dict)
if not mf.allow_create:
writemsg_stdout(_(">>> Skipping creating Manifest for %s; "
"repository is configured to not use them\n") % mysettings["O"])
def normalize_path(mypath):
"""
os.path.normpath("//foo") returns "//foo" instead of "/foo"
We dislike this behavior so we create our own normpath func
to fix it.
"""
if isinstance(mypath, bytes):
path_sep = os.path.sep.encode()
else:
path_sep = os.path.sep
if mypath.startswith(path_sep):
# posixpath.normpath collapses 3 or more leading slashes to just 1.
return os.path.normpath(2*path_sep + mypath)
else:
return os.path.normpath(mypath)
Create the specified directory. Also, copy gid and group mode
bits from self.pkgdir if possible.
@param cat_dir: Absolute path of the directory to be created.
@type cat_dir: String
"""
try:
pkgdir_st = os.stat(self.pkgdir)
except OSError:
ensure_dirs(path)
return
pkgdir_gid = pkgdir_st.st_gid
pkgdir_grp_mode = 0o2070 & pkgdir_st.st_mode
try:
ensure_dirs(path, gid=pkgdir_gid, mode=pkgdir_grp_mode, mask=0)
except PortageException:
if not os.path.isdir(path):
raise
def digest_check(self, xpkg, checkdir):
self.repoman_settings['O'] = checkdir
self.repoman_settings['PORTAGE_QUIET'] = '1'
if not portage.digestcheck([], self.repoman_settings, strict=1):
self.qatracker.add_error("manifest.bad", os.path.join(xpkg, 'Manifest'))
self.repoman_settings.pop('PORTAGE_QUIET', None)