Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
block_cache.init(options.threads)
nursery.start_soon(metadata_upload_task.run, name='metadata-upload-task')
cm.callback(metadata_upload_task.stop)
nursery.start_soon(commit_task.run, name='commit-task')
cm.callback(commit_task.stop)
exc_info = setup_exchook()
if options.systemd:
import systemd.daemon
systemd.daemon.notify('READY=1')
try:
ret = await pyfuse3.main()
finally:
await operations.destroy()
await block_cache.destroy(options.keep_cache)
if ret is not None:
raise RuntimeError('Received signal %d, terminating' % (ret,))
# Re-raise if main loop terminated due to exception in other thread
if exc_info:
(exc_inst, exc_tb) = exc_info
raise exc_inst.with_traceback(exc_tb)
log.info("FUSE main loop terminated.")
unmount_clean = True
args = parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
argv = args.argv_prefix if args.argv_prefix else ("bash", "-c")
operations = BashFS(argv_prefix=argv, separator=args.separator.encode())
fuse_options = set(pyfuse3.default_options)
fuse_options.add("fsname=bashfs")
fuse_options.discard("default_permissions")
if args.debug_fuse:
fuse_options.add("debug")
pyfuse3.init(operations, args.mountpoint, fuse_options)
try:
trio.run(pyfuse3.main)
except:
pyfuse3.close(unmount=True)
raise
pyfuse3.close()