Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_getxattr(ctx):
(fh, inode) = await ctx.server.create(ROOT_INODE, newname(ctx),
file_mode(), os.O_RDWR, some_ctx)
await ctx.server.release(fh)
with assert_raises(FUSEError):
await ctx.server.getxattr(inode.st_ino, b'nonexistant-attr', some_ctx)
await ctx.server.setxattr(inode.st_ino, b'my-attr', b'strabumm!', some_ctx)
assert await ctx.server.getxattr(inode.st_ino, b'my-attr', some_ctx) == b'strabumm!'
await ctx.server.forget([(inode.st_ino, 1)])
await fsck(ctx)
await ctx.server.release(fh)
# Create
with assert_raises(FUSEError) as cm:
ctx.server._create(inode2.st_ino, b'dir1', dir_mode(), os.O_RDWR, some_ctx)
assert cm.value.errno == errno.EPERM
# Setattr
attr = await ctx.server.getattr(inode2a.st_ino, some_ctx)
with assert_raises(FUSEError) as cm:
await ctx.server.setattr(inode2a.st_ino, attr, SetattrFields(update_mtime=True),
None, some_ctx)
assert cm.value.errno == errno.EPERM
# xattr
with assert_raises(FUSEError) as cm:
await ctx.server.setxattr(inode2.st_ino, b'name', b'value', some_ctx)
assert cm.value.errno == errno.EPERM
with assert_raises(FUSEError) as cm:
await ctx.server.removexattr(inode2.st_ino, b'name', some_ctx)
assert cm.value.errno == errno.EPERM
await ctx.server.forget(list(ctx.server.open_inodes.items()))
await fsck(ctx)
def _create(self, id_p, name, mode, ctx, rdev=0, size=0):
if name == CTRL_NAME:
log.warning('Attempted to create s3ql control file at %s',
get_path(id_p, self.db, name))
raise FUSEError(errno.EACCES)
now_ns = time_ns()
inode_p = self.inodes[id_p]
if inode_p.locked:
raise FUSEError(errno.EPERM)
if inode_p.refcount == 0:
log.warning('Attempted to create entry %s with unlinked parent %d',
name, id_p)
raise FUSEError(errno.EINVAL)
inode_p.mtime_ns = now_ns
inode_p.ctime_ns = now_ns
if inode_p.mode & stat.S_ISGID:
gid = inode_p.gid
if stat.S_ISDIR(mode):
mode |= stat.S_ISGID
else:
gid = ctx.gid
inode = self.inodes.create_inode(mtime_ns=now_ns, ctime_ns=now_ns, atime_ns=now_ns,
uid=ctx.uid, gid=gid, mode=mode, refcount=1,
rdev=rdev, size=size)
self.db.execute("INSERT INTO contents(name_id, inode, parent_inode) VALUES(?,?,?)",
(self._add_name(name), inode.id, id_p))
object.__setattr__(inode, 'gid', os.getgid())
elif name == '.':
inode = self.inodes[id_p]
elif name == '..':
id_ = self.db.get_val("SELECT parent_inode FROM contents WHERE inode=?",
(id_p,))
inode = self.inodes[id_]
else:
try:
id_ = self.db.get_val("SELECT inode FROM contents_v WHERE name=? AND parent_inode=?",
(name, id_p))
except NoSuchRowError:
raise FUSEError(errno.ENOENT)
inode = self.inodes[id_]
self.open_inodes[inode.id] += 1
return inode
def _create(self, id_p, name, mode, ctx, rdev=0, size=0):
if name == CTRL_NAME:
log.warning('Attempted to create s3ql control file at %s',
get_path(id_p, self.db, name))
raise FUSEError(errno.EACCES)
now_ns = time_ns()
inode_p = self.inodes[id_p]
if inode_p.locked:
raise FUSEError(errno.EPERM)
if inode_p.refcount == 0:
log.warning('Attempted to create entry %s with unlinked parent %d',
name, id_p)
raise FUSEError(errno.EINVAL)
inode_p.mtime_ns = now_ns
inode_p.ctime_ns = now_ns
if inode_p.mode & stat.S_ISGID:
gid = inode_p.gid
`id_` must be the inode of `name`. If `force` is True, then
the `locked` attribute is ignored.
'''
log.debug('started with %d, %r', id_p, name)
now_ns = time_ns()
# Check that there are no child entries
if self.db.has_val("SELECT 1 FROM contents WHERE parent_inode=?", (id_,)):
log.debug("Attempted to remove entry with children: %s",
get_path(id_p, self.db, name))
raise FUSEError(errno.ENOTEMPTY)
if self.inodes[id_p].locked and not force:
raise FUSEError(errno.EPERM)
name_id = self._del_name(name)
self.db.execute("DELETE FROM contents WHERE name_id=? AND parent_inode=?",
(name_id, id_p))
inode = self.inodes[id_]
inode.refcount -= 1
inode.ctime_ns = now_ns
inode_p = self.inodes[id_p]
inode_p.mtime_ns = now_ns
inode_p.ctime_ns = now_ns
if inode.refcount == 0 and id_ not in self.open_inodes:
log.debug('removing from cache')
await self.cache.remove(id_, 0, int(math.ceil(inode.size / self.max_obj_size)))
async def _remove(self, id_p, name, id_, force=False):
'''Remove entry `name` with parent inode `id_p`
`id_` must be the inode of `name`. If `force` is True, then
the `locked` attribute is ignored.
'''
log.debug('started with %d, %r', id_p, name)
now_ns = time_ns()
# Check that there are no child entries
if self.db.has_val("SELECT 1 FROM contents WHERE parent_inode=?", (id_,)):
log.debug("Attempted to remove entry with children: %s",
get_path(id_p, self.db, name))
raise FUSEError(errno.ENOTEMPTY)
if self.inodes[id_p].locked and not force:
raise FUSEError(errno.EPERM)
name_id = self._del_name(name)
self.db.execute("DELETE FROM contents WHERE name_id=? AND parent_inode=?",
(name_id, id_p))
inode = self.inodes[id_]
inode.refcount -= 1
inode.ctime_ns = now_ns
inode_p = self.inodes[id_p]
inode_p.mtime_ns = now_ns
inode_p.ctime_ns = now_ns
async def removexattr(self, id_, name, ctx):
log.debug('started with %d, %r', id_, name)
if self.failsafe or self.inodes[id_].locked:
raise FUSEError(errno.EPERM)
try:
name_id = self._del_name(name)
except NoSuchRowError:
raise FUSEError(pyfuse3.ENOATTR)
changes = self.db.execute('DELETE FROM ext_attributes WHERE inode=? AND name_id=?',
(id_, name_id))
if changes == 0:
raise FUSEError(pyfuse3.ENOATTR)
self.inodes[id_].ctime_ns = time_ns()
async def rename(self, id_p_old, name_old, id_p_new, name_new, flags, ctx):
if flags:
raise FUSEError(errno.ENOTSUP)
log.debug('started with %d, %r, %d, %r', id_p_old, name_old, id_p_new, name_new)
if name_new == CTRL_NAME or name_old == CTRL_NAME:
log.warning('Attempted to rename s3ql control file (%s -> %s)',
get_path(id_p_old, self.db, name_old),
get_path(id_p_new, self.db, name_new))
raise FUSEError(errno.EACCES)
if (self.failsafe or self.inodes[id_p_old].locked
or self.inodes[id_p_new].locked):
raise FUSEError(errno.EPERM)
inode_old = self._lookup(id_p_old, name_old, ctx)
try: