Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
self.assertEqual(len(self.all), len(contents.contentsSet(self.all)))
self.assertRaises(TypeError, contents.contentsSet, self.all + [1])
contents.contentsSet(self.all)
contents.contentsSet(self.all, mutable=True)
# test to ensure no one screwed up the optional initials
# making it mandatory
self.assertEqual(len(contents.contentsSet()), 0)
def test_add_missing_directories(self):
src = [self.mk_file("/dir1/a"), self.mk_file("/dir2/dir3/b"),
self.mk_dir("/dir1/dir4")]
cs = contents.contentsSet(src)
cs.add_missing_directories()
self.assertEqual(sorted(x.location for x in cs),
['/dir1', '/dir1/a', '/dir1/dir4', '/dir2', '/dir2/dir3',
'/dir2/dir3/b'])
obj = cs['/dir1']
self.assertEqual(obj.mode, 0o775)
def test_intersect(self):
open(pjoin(self.dir, 'reg'), 'w').close()
cset = contentsSet([fs.fsFile('reg', strict=False)])
cset = cset.insert_offset(self.dir)
self.assertEqual(contentsSet(livefs.intersect(cset)), cset)
cset = contentsSet([fs.fsFile('reg/foon', strict=False),
fs.fsFile('reg/dar', strict=False),
fs.fsDir('reg/dir', strict=False)]).insert_offset(self.dir)
self.assertEqual(list(livefs.intersect(cset)), [])
cset = contentsSet([fs.fsDir('reg', strict=False)])
self.assertEqual(list(livefs.intersect(cset)), [])
def assertCsetNotEqual(self, cset1, cset2):
if not isinstance(cset1, contentsSet):
cset1 = contentsSet(cset1)
if not isinstance(cset2, contentsSet):
cset2 = contentsSet(cset2)
self.assertNotEqual(cset1, cset2, reflective=False)
def listobj(self, name, obj_class=None):
valid_list = getattr(self, name)
cs = contents.contentsSet(valid_list)
test_list = getattr(cs, name)()
if obj_class is not None:
for x in test_list:
self.assertInstance(x, obj_class)
self.assertEqual(set(test_list), set(valid_list))
def test_remove(self):
self.assertRaises(AttributeError,
contents.contentsSet(mutable=False).remove, self.devs[0])
self.assertRaises(AttributeError,
contents.contentsSet(mutable=False).remove, 1)
cs = contents.contentsSet(self.all, mutable=True)
for x in self.all:
cs.remove(x)
cs = contents.contentsSet(self.all, mutable=True)
for location in (x.location for x in self.all):
cs.remove(location)
self.assertEqual(len(cs), 0)
self.assertRaises(KeyError, cs.remove, self.all[0])
def test_callback(self):
for attr in dir(self):
if not attr.startswith('entries') or 'fail' in attr:
continue
e = getattr(self, attr)
if not isinstance(e, dict):
continue
src, dest, cset = self.generic_merge_bits(e)
new_cset = contents.contentsSet(contents.offset_rewriter(dest, cset))
s = set(new_cset)
ops.merge_contents(cset, offset=dest, callback=s.remove)
self.assertFalse(s)
#control['Section'] = pkg.category
control['Version'] = pkg.fullver
control['Architecture'] = platform
if maintainer:
control['Maintainer'] = maintainer
control['Description'] = pkg.description
pkgdeps = "%s" % (pkg.rdepend,)
if (pkgdeps is not None and pkgdeps != ""):
control.update(parsedeps(pkgdeps))
control_ds = text_data_source("".join("%s: %s\n" % (k, v)
for (k, v) in control.items()))
control_path = pjoin(tempspace, 'control.tar.gz')
tar.write_set(
contents.contentsSet([
fs.fsFile('control',
{'size':len(control_ds.text_fileobj().getvalue())},
data=control_ds,
uid=0, gid=0, mode=0o644, mtime=time.time())
]),
control_path, compressor='gz')
dbinary_path = pjoin(tempspace, 'debian-binary')
with open(dbinary_path, 'w') as f:
f.write("2.0\n")
ret = spawn(['ar', '-r', finalpath, dbinary_path, data_path, control_path])
if ret != 0:
unlink_if_exists(finalpath)
raise Exception("failed creating archive: return code %s" % (ret,))
:param cset: :class:`pkgcore.fs.contents.contentsSet` instance
:param offset: if not None, offset to prefix all locations with.
Think of it as target dir.
:param callback: callable to report each entry being merged; given a single arg,
the fs object being merged.
:raise EnvironmentError: Thrown for permission failures.
"""
if callback is None:
callback = lambda obj:None
ensure_perms = get_plugin("fs_ops.ensure_perms")
copyfile = get_plugin("fs_ops.copyfile")
mkdir = get_plugin("fs_ops.mkdir")
if not isinstance(cset, contents.contentsSet):
raise TypeError(f'cset must be a contentsSet, got {cset!r}')
if offset is not None:
if os.path.exists(offset):
if not os.path.isdir(offset):
raise TypeError(f'offset must be a dir, or not exist: {offset}')
else:
mkdir(fs.fsDir(offset, strict=False))
iterate = partial(contents.offset_rewriter, offset.rstrip(os.path.sep))
else:
iterate = iter
d = list(iterate(cset.iterdirs()))
d.sort()
for x in d:
callback(x)
def symmetric_difference(self, other):
i = self.intersection(other)
return contentsSet(chain(iter(self.difference(i)),
iter(other.difference(i))))