Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def tst_bug382(self):
dirname = self.newname()
fullname = self.mnt_dir + "/" + dirname
os.mkdir(fullname)
assert stat.S_ISDIR(os.stat(fullname).st_mode)
assert dirname in pyfuse3.listdir(self.mnt_dir)
cmd = ('(%d, %r)' % (pyfuse3.ROOT_INODE, path2bytes(dirname))).encode()
pyfuse3.setxattr('%s/%s' % (self.mnt_dir, CTRL_NAME), 'rmtree', cmd)
# Invalidation is asynchronous...
try:
retry(5, lambda: not os.path.exists(fullname))
except RetryTimeoutError:
pass # assert_raises should fail
assert_raises(FileNotFoundError, os.stat, fullname)
assert dirname not in pyfuse3.listdir(self.mnt_dir)
from pyfuse3 import ROOT_INODE
VERSION = '3.3.2'
RELEASE = '%s' % VERSION
# TODO: On next revision bump, remove upgrade code from backend/comprenc.py and
# backends/s3c.py (activated by UPGRADE_MODE variable).
CURRENT_FS_REV = 24
# Buffer size when writing objects
BUFSIZE = 64 * 1024
# Name and inode of the special s3ql control file
CTRL_NAME = '.__s3ql__ctrl__'
CTRL_INODE = ROOT_INODE+1
# Maps file system revisions to the last S3QL version that
# supported this revision.
REV_VER_MAP = {
23: '2.26',
22: '2.16',
21: '2.13',
20: '2.9',
16: '1.15',
15: '1.10',
14: '1.8.1',
13: '1.6',
12: '1.3',
11: '1.0.1',
}
def __init__(self, argv_prefix=("bash", "-c"), separator=b"|"):
super(pyfuse3.Operations, self).__init__()
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
self.argv_prefix = list(argv_prefix)
self.separator = separator
self._inode_generator = count(start=10)
self._file_generator = count(start=10)
self._inode_map = {pyfuse3.ROOT_INODE: Node(None, None,
pyfuse3.ROOT_INODE,
is_root=True)}
self._proc_map = {}
def __init__(self, argv_prefix=("bash", "-c"), separator=b"|"):
super(pyfuse3.Operations, self).__init__()
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
self.argv_prefix = list(argv_prefix)
self.separator = separator
self._inode_generator = count(start=10)
self._file_generator = count(start=10)
self._inode_map = {pyfuse3.ROOT_INODE: Node(None, None,
pyfuse3.ROOT_INODE,
is_root=True)}
self._proc_map = {}
def __init__(self, block_cache, db, max_obj_size, inode_cache,
upload_event=None):
super().__init__()
self.inodes = inode_cache
self.db = db
self.upload_event = upload_event
self.open_inodes = collections.defaultdict(lambda: 0)
self.max_obj_size = max_obj_size
self.cache = block_cache
self.failsafe = False
self.broken_blocks = collections.defaultdict(set)
# Root inode is always open
self.open_inodes[pyfuse3.ROOT_INODE] += 1