Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_intersect(self):
open(pjoin(self.dir, 'reg'), 'w').close()
cset = contentsSet([fs.fsFile('reg', strict=False)])
cset = cset.insert_offset(self.dir)
self.assertEqual(contentsSet(livefs.intersect(cset)), cset)
cset = contentsSet([fs.fsFile('reg/foon', strict=False),
fs.fsFile('reg/dar', strict=False),
fs.fsDir('reg/dir', strict=False)]).insert_offset(self.dir)
self.assertEqual(list(livefs.intersect(cset)), [])
cset = contentsSet([fs.fsDir('reg', strict=False)])
self.assertEqual(list(livefs.intersect(cset)), [])
def test_offset_rewriter(self):
f = ["/foon/%i" % x for x in range(10)]
f.extend("/foon/%i/blah" % x for x in range(5))
f = [fs.fsFile(x, strict=False) for x in f]
self.assertEqual(sorted(x.location for x in f),
sorted(x.location for x in self.offset_insert('/', f)))
self.assertEqual(
sorted(f'/usr{x.location}' for x in f),
sorted(x.location for x in self.offset_insert('/usr', f)))
def test_observer_warn(self):
warnings = []
engine = fake_engine(observer=make_fake_reporter(warn=warnings.append))
self._trigger_override = self.kls()
def run(fs_objs, fix_perms=False):
self.kls(fix_perms=fix_perms).trigger(engine,
contentsSet(fs_objs))
run([fs.fsFile('/foon', mode=0o770, strict=False)])
self.assertFalse(warnings)
run([fs.fsFile('/foon', mode=0o772, strict=False)])
self.assertEqual(len(warnings), 1)
self.assertIn('/foon', warnings[0])
warnings[:] = []
run([fs.fsFile('/dar', mode=0o776, strict=False),
fs.fsFile('/bar', mode=0o776, strict=False),
fs.fsFile('/far', mode=0o770, strict=False)])
self.assertEqual(len(warnings), 2)
self.assertIn('/dar', ' '.join(warnings))
self.assertIn('/bar', ' '.join(warnings))
self.assertNotIn('/far', ' '.join(warnings))
self._trigger_override = self.kls()
def run(fs_objs, fix_perms=False):
self.kls(fix_perms=fix_perms).trigger(engine,
contentsSet(fs_objs))
run([fs.fsFile('/foon', mode=0o770, strict=False)])
self.assertFalse(warnings)
run([fs.fsFile('/foon', mode=0o772, strict=False)])
self.assertEqual(len(warnings), 1)
self.assertIn('/foon', warnings[0])
warnings[:] = []
run([fs.fsFile('/dar', mode=0o776, strict=False),
fs.fsFile('/bar', mode=0o776, strict=False),
fs.fsFile('/far', mode=0o770, strict=False)])
self.assertEqual(len(warnings), 2)
self.assertIn('/dar', ' '.join(warnings))
self.assertIn('/bar', ' '.join(warnings))
self.assertNotIn('/far', ' '.join(warnings))
self.clear()
for line in self._get_fd():
if not line:
continue
s = line.split(" ")
if s[0] in ("dir", "dev", "fif"):
path = ' '.join(s[1:])
if s[0] == 'dir':
obj = fs.fsDir(path, strict=False)
elif s[0] == 'dev':
obj = LookupFsDev(path, strict=False)
else:
obj = fs.fsFifo(path, strict=False)
elif s[0] == "obj":
path = ' '.join(s[1:-2])
obj = fs.fsFile(
path, chksums={"md5":long(s[-2], 16)},
mtime=long(s[-1]), strict=False)
elif s[0] == "sym":
try:
p = s.index("->")
obj = fs.fsLink(' '.join(s[1:p]), ' '.join(s[p+1:-1]),
mtime=long(s[-1]), strict=False)
except ValueError:
# XXX throw a corruption error
raise
else:
raise Exception(
"unknown entry type %r" % (line,))
yield obj
def fsobj_to_xarinfo(fsobj):
t = xarfile.XarInfo(fsobj.location)
if isinstance(fsobj, fsFile):
t.type = 'file'
t.size = fsobj.chksums["size"]
elif isinstance(fsobj, fsDir):
t.type = 'directory'
elif isinstance(fsobj, fsSymlink):
t.type = 'symlink'
t.linkname = fsobj.target
elif isinstance(fsobj, fsFifo):
t.type = 'fifo'
elif isinstance(fsobj, fsDev):
if stat.S_ISCHR(fsobj.mode):
t.type = 'chrtype'
else:
t.type = 'blktype'
t.devmajor = fsobj.major
t.devminor = fsobj.minor
if stat_func == os.lstat or e.errno != errno.ENOENT:
raise
stat = os.lstat(real_location)
mode = stat.st_mode
d = {"mtime":stat.st_mtime, "mode":S_IMODE(mode),
"uid":stat.st_uid, "gid":stat.st_gid}
if S_ISREG(mode):
d["size"] = stat.st_size
d["data"] = local_source(real_location)
d["dev"] = stat.st_dev
d["inode"] = stat.st_ino
if chksum_handlers is not None:
d["chf_types"] = chksum_handlers
d.update(overrides)
return fsFile(path, **d)
d.update(overrides)
if S_ISDIR(mode):
return fsDir(path, **d)
elif S_ISLNK(mode):
d["target"] = os.readlink(real_location)
return fsSymlink(path, **d)
elif S_ISFIFO(mode):
return fsFifo(path, **d)
else:
major, minor = get_major_minor(stat)
d["minor"] = minor
d["major"] = major
d["mode"] = mode
return fsDev(path, **d)
"Tarfile file %r is a hardlink to %r, but we can't "
"find the resolved hardlink target %r in the archive. "
"This means either a bug in pkgcore, or a malformed "
"tarball." % (member.name, member.linkname, target))
d["inode"] = inode
else:
d["inode"] = inode = _unique_inode()
# Add the new file to the inode cache even if we're currently processing a
# hardlink; tar allows for hardlink chains of x -> y -> z; thus we have
# to ensure 'y' is in the cache alongside it's target z to support 'x'
# later lookup.
inodes[location] = inode
d["data"] = invokable_data_source.wrap_function(partial(
src_tar.extractfile, member.name), returns_text=False,
returns_handle=True)
yield fsFile(location, **d)
elif member.issym() or member.islnk():
yield fsSymlink(location, member.linkname, **d)
elif member.isfifo():
yield fsFifo(location, **d)
elif member.isdev():
d["major"] = int(member.major)
d["minor"] = int(member.minor)
yield fsDev(location, **d)
else:
raise AssertionError(
"unknown type %r, %r was encounted walking tarmembers" %
(member, member.type))