Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_it(self):
orig = contentsSet([
fs.fsFile('/cheddar', strict=False),
fs.fsFile('/sporks-suck', strict=False),
fs.fsDir('/foons-rule', strict=False),
fs.fsDir('/mango', strict=False)
])
engine = fake_engine(mode=const.INSTALL_MODE)
def run(func):
new = contentsSet(orig)
self.kls(func)(engine, {'new_cset':new})
return new
self.assertEqual(orig, run(lambda s:False))
self.assertEqual([], run(post_curry(isinstance, fs.fsDir)).dirs())
self.assertEqual(sorted(orig.files()),
sorted(run(post_curry(isinstance, fs.fsDir)).dirs(True)))
# check noisyness.
info = []
engine = fake_engine(observer=make_fake_reporter(info=info.append),
def test_it(self):
orig = contentsSet([
fs.fsFile('/cheddar', strict=False),
fs.fsFile('/sporks-suck', strict=False),
fs.fsDir('/foons-rule', strict=False),
fs.fsDir('/mango', strict=False)
])
engine = fake_engine(mode=const.INSTALL_MODE)
def run(func):
new = contentsSet(orig)
self.kls(func)(engine, {'new_cset':new})
return new
self.assertEqual(orig, run(lambda s:False))
self.assertEqual([], run(post_curry(isinstance, fs.fsDir)).dirs())
self.assertEqual(sorted(orig.files()),
sorted(run(post_curry(isinstance, fs.fsDir)).dirs(True)))
# check noisyness.
info = []
engine = fake_engine(observer=make_fake_reporter(info=info.append),
mode=const.REPLACE_MODE)
run(lambda s:False)
self.assertFalse(info)
run(post_curry(isinstance, fs.fsDir))
self.assertEqual(len(info), 2)
# ensure only the relevant files show.
self.assertNotIn('/cheddar', ' '.join(info))
self.assertNotIn('/sporks-suck', ' '.join(info))
def test_it(self):
o = fs.fsDir(pjoin(self.dir, "mkdir_test"), strict=False)
self.assertTrue(ops.default_mkdir(o))
old_umask = os.umask(0)
try:
self.assertEqual((os.stat(o.location).st_mode & 0o4777), 0o777 & ~old_umask)
finally:
os.umask(old_umask)
os.rmdir(o.location)
o = fs.fsDir(pjoin(self.dir, "mkdir_test2"), strict=False, mode=0o750)
self.assertTrue(ops.default_mkdir(o))
self.assertEqual(os.stat(o.location).st_mode & 0o4777, 0o750)
def test_intersect(self):
open(pjoin(self.dir, 'reg'), 'w').close()
cset = contentsSet([fs.fsFile('reg', strict=False)])
cset = cset.insert_offset(self.dir)
self.assertEqual(contentsSet(livefs.intersect(cset)), cset)
cset = contentsSet([fs.fsFile('reg/foon', strict=False),
fs.fsFile('reg/dar', strict=False),
fs.fsDir('reg/dir', strict=False)]).insert_offset(self.dir)
self.assertEqual(list(livefs.intersect(cset)), [])
cset = contentsSet([fs.fsDir('reg', strict=False)])
self.assertEqual(list(livefs.intersect(cset)), [])
def _iter_contents(self):
self.clear()
for line in self._get_fd():
if not line:
continue
s = line.split(" ")
if s[0] in ("dir", "dev", "fif"):
path = ' '.join(s[1:])
if s[0] == 'dir':
obj = fs.fsDir(path, strict=False)
elif s[0] == 'dev':
obj = LookupFsDev(path, strict=False)
else:
obj = fs.fsFifo(path, strict=False)
elif s[0] == "obj":
path = ' '.join(s[1:-2])
obj = fs.fsFile(
path, chksums={"md5":int(s[-2], 16)},
mtime=int(s[-1]), strict=False)
elif s[0] == "sym":
try:
p = s.index("->")
obj = fs.fsLink(' '.join(s[1:p]), ' '.join(s[p+1:-1]),
mtime=int(s[-1]), strict=False)
except ValueError:
mode = stat.st_mode
d = {"mtime":stat.st_mtime, "mode":S_IMODE(mode),
"uid":stat.st_uid, "gid":stat.st_gid}
if S_ISREG(mode):
d["size"] = stat.st_size
d["data"] = local_source(real_location)
d["dev"] = stat.st_dev
d["inode"] = stat.st_ino
if chksum_handlers is not None:
d["chf_types"] = chksum_handlers
d.update(overrides)
return fsFile(path, **d)
d.update(overrides)
if S_ISDIR(mode):
return fsDir(path, **d)
elif S_ISLNK(mode):
d["target"] = os.readlink(real_location)
return fsSymlink(path, **d)
elif S_ISFIFO(mode):
return fsFifo(path, **d)
else:
major, minor = get_major_minor(stat)
d["minor"] = minor
d["major"] = major
d["mode"] = mode
return fsDev(path, **d)
continue
for key, restricts in depset.find_cond_nodes(depset.restrictions, True):
if not restricts and key.intersects(revdep):
out.write(f' {name} on {revdep} through {key}')
for key, restricts in depset.node_conds.items():
if key.intersects(revdep):
restricts = ' or '.join(map(str, restricts))
out.write(f' {name} on {revdep} through {key} if USE {restricts},')
# If we printed anything at all print the newline now
out.autoline = True
if printed_something:
out.write()
if options.contents:
color = {
fs_module.fsDir: [out.bold, out.fg('blue')],
fs_module.fsLink: [out.bold, out.fg('cyan')],
}
for obj in sorted(obj for obj in get_pkg_attr(pkg, 'contents', ())):
if options.color:
out.write(*(color.get(obj.__class__, []) + [obj] + [out.reset]))
else:
out.write(f'{obj!r}')
if options.size:
size = 0
files = 0
for location in (obj.location for obj in get_pkg_attr(pkg, 'contents', ())):
files += 1
size += os.lstat(location).st_size
out.write(f'Total files: {files}')
out.write(f'Total size: {sizeof_fmt(size)}')
def fsobj_to_xarinfo(fsobj):
t = xarfile.XarInfo(fsobj.location)
if isinstance(fsobj, fsFile):
t.type = 'file'
t.size = fsobj.chksums["size"]
elif isinstance(fsobj, fsDir):
t.type = 'directory'
elif isinstance(fsobj, fsSymlink):
t.type = 'symlink'
t.linkname = fsobj.target
elif isinstance(fsobj, fsFifo):
t.type = 'fifo'
elif isinstance(fsobj, fsDev):
if stat.S_ISCHR(fsobj.mode):
t.type = 'chrtype'
else:
t.type = 'blktype'
t.devmajor = fsobj.major
t.devminor = fsobj.minor
t.mode = fsobj.mode
t.uid = fsobj.uid
t.gid = fsobj.gid
if callback is None:
callback = lambda obj:None
ensure_perms = get_plugin("fs_ops.ensure_perms")
copyfile = get_plugin("fs_ops.copyfile")
mkdir = get_plugin("fs_ops.mkdir")
if not isinstance(cset, contents.contentsSet):
raise TypeError(f'cset must be a contentsSet, got {cset!r}')
if offset is not None:
if os.path.exists(offset):
if not os.path.isdir(offset):
raise TypeError(f'offset must be a dir, or not exist: {offset}')
else:
mkdir(fs.fsDir(offset, strict=False))
iterate = partial(contents.offset_rewriter, offset.rstrip(os.path.sep))
else:
iterate = iter
d = list(iterate(cset.iterdirs()))
d.sort()
for x in d:
callback(x)
try:
# we pass in the stat ourselves, using stat instead of
# lstat gen_obj uses internally; this is the equivalent of
# "deference that link"
obj = gen_obj(x.location, stat=os.stat(x.location))
if not fs.isdir(obj):
# according to the spec, dirs can't be merged over files