Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _fetch_fs(self, mirror_info):
file_path = os.path.join(mirror_info.location, self.distfile)
st = None
size_ok = False
try:
st = os.stat(file_path)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ESTALE):
msg = "%s stat failed in %s: %s" % \
(self.distfile, mirror_info.name, e)
self.scheduler.output(msg + '\n', background=True,
log_path=self._log_path)
logging.error(msg)
else:
size_ok = st.st_size == self.digests["size"]
self._current_stat = st
if size_ok:
self._current_mirror = mirror_info
self._start_task(
FileDigester(file_path=file_path,
hash_names=(self._select_hash(),),
def _update_recycle_db(self):
start_time = self._config.start_time
recycle_dir = self._config.options.recycle_dir
recycle_db = self._config.recycle_db
r_deletion_delay = self._config.options.recycle_deletion_delay
# Use a dict optimize access.
recycle_db_cache = dict(recycle_db.items())
for filename in os.listdir(recycle_dir):
recycle_file = os.path.join(recycle_dir, filename)
try:
st = os.stat(recycle_file)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ESTALE):
logging.error(("stat failed for '%s' in "
"recycle: %s") % (filename, e))
continue
value = recycle_db_cache.pop(filename, None)
if value is None:
logging.debug(("add '%s' to "
"recycle db") % filename)
recycle_db[filename] = (st.st_size, start_time)
else:
r_size, r_time = value
if int(r_size) != st.st_size:
recycle_db[filename] = (st.st_size, start_time)
elif r_time + r_deletion_delay < start_time:
def load(self):
atoms_changed = False
try:
mtime = os.stat(self._filename).st_mtime
except (OSError, IOError):
mtime = None
if (not self._loaded or self._mtime != mtime):
try:
data, errors = self.loader.load()
for fname in errors:
for e in errors[fname]:
self.errors.append(fname+": "+e)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
del e
data = {}
nonatoms = list(data)
self._mtime = mtime
atoms_changed = True
if entry not in _digests:
self.qatracker.add_error("digest.missing", checkdir + "::" + entry)
del myfiles_all
if os.path.exists(checkdir + "/files"):
filesdirlist = os.listdir(checkdir + "/files")
# Recurse through files directory, use filesdirlist as a stack;
# appending directories as needed,
# so people can't hide > 20k files in a subdirectory.
while filesdirlist:
y = filesdirlist.pop(0)
relative_path = os.path.join(xpkg, "files", y)
full_path = os.path.join(self.repo_settings.repodir, relative_path)
try:
mystat = os.stat(full_path)
except OSError as oe:
if oe.errno == 2:
# don't worry about it. it likely was removed via fix above.
continue
else:
raise oe
if S_ISDIR(mystat.st_mode):
if self.vcs_settings.status.isVcsDir(y):
continue
for z in os.listdir(checkdir + "/files/" + y):
if self.vcs_settings.status.isVcsDir(z):
continue
filesdirlist.append(y + "/" + z)
# Current policy is no files over 20 KiB, these are the checks.
# File size between 20 KiB and 60 KiB causes a warning,
# while file size over 60 KiB causes an error.
@type onProgress: Function
@rtype: dict
@return: dictionary of packages that failed to merges
"""
failed_pkgs = {}
for cat in os.listdir(self._vardb_path):
pkgs_path = os.path.join(self._vardb_path, cat)
if not os.path.isdir(pkgs_path):
continue
pkgs = os.listdir(pkgs_path)
maxval = len(pkgs)
for i, pkg in enumerate(pkgs):
if onProgress:
onProgress(maxval, i+1)
if MERGING_IDENTIFIER in pkg:
mtime = int(os.stat(os.path.join(pkgs_path, pkg)).st_mtime)
pkg = os.path.join(cat, pkg)
failed_pkgs[pkg] = mtime
return failed_pkgs
# If the file is empty then it's obviously invalid. Don't
# trust the return value from the fetcher. Remove the
# empty file and try to download again.
try:
mystat = os.lstat(download_path)
if mystat.st_size == 0 or (stat.S_ISLNK(mystat.st_mode) and not os.path.exists(download_path)):
os.unlink(download_path)
fetched = 0
continue
except EnvironmentError:
pass
if mydigests is not None and myfile in mydigests:
try:
mystat = os.stat(download_path)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ESTALE):
raise
del e
fetched = 0
else:
if stat.S_ISDIR(mystat.st_mode):
# This can happen if FETCHCOMMAND erroneously
# contains wget's -P option where it should
# instead have -O.
writemsg_level(
_("!!! The command specified in the "
"%s variable appears to have\n!!! "
"created a directory instead of a "
"normal file.\n") % command_var,
raise SetConfigError(_("no more than one of these options is allowed: 'package', 'filestamp', 'seconds', 'date'"))
setformat = formats[0]
if (setformat == "package"):
package = options.get("package")
try:
cpv = vardbapi.match(package)[0]
date, = vardbapi.aux_get(cpv, ('BUILD_TIME',))
date = int(date)
except (KeyError, ValueError):
raise SetConfigError(_("cannot determine installation date of package %s") % package)
elif (setformat == "filestamp"):
filestamp = options.get("filestamp")
try:
date = int(os.stat(filestamp).st_mtime)
except (OSError, ValueError):
raise SetConfigError(_("cannot determine 'filestamp' of '%s'") % filestamp)
elif (setformat == "seconds"):
try:
date = int(options.get("seconds"))
except ValueError:
raise SetConfigError(_("option 'seconds' must be an integer"))
else:
dateopt = options.get("date")
try:
dateformat = options.get("dateformat", "%x %X")
date = int(time.mktime(time.strptime(dateopt, dateformat)))
except ValueError:
raise SetConfigError(_("'date=%s' does not match 'dateformat=%s'") % (dateopt, dateformat))
return DateSet(vardb=vardbapi, date=date, mode=mode)
def hardlink_is_mine(link, lock):
try:
lock_st = os.stat(lock)
if lock_st.st_nlink == 2:
link_st = os.stat(link)
return lock_st.st_ino == link_st.st_ino and \
lock_st.st_dev == link_st.st_dev
except OSError:
pass
return False