fix(tvix/store/bin): fix shutdown behaviour for FUSE

Both umounts happening from another process, as well as tvix-store
itself calling umount() on FuseDaemon will cause the FUSE worker threads
to terminate.

So far there was no nice way to wait on these threads to be terminated
from multiple places, causing the `tvix-store mount` command to only be
terminated if interrupted via ctrl-c, not via an external umount.

Update FuseDaemon to use a ThreadPool, which gives us a join primitive
over all threads, that can also be called from multiple places.

Await on a join() from there to end the program, not the ctrl-c signal
handler as it was before.

Using FuseDaemon from multiple tasks requires Arc<>-ing both the
ThreadPool as well as the inner FuseSession (which also needs to be
inside a Mutex if we want to unmount), but now we can clone FuseDaemon
around and use it in two places. We could probably also have used an
Option and drop the FuseSession after the first umount, but this looks
cleaner.

Change-Id: Id635ef59b560c111db52ad0b3ca3d12bc7ae28ca
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11825
Reviewed-by: Brian Olsen <me@griff.name>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-06-16 11:43:48 +03:00 committed by flokli
parent 452299dcd2
commit 7e42b4f314
6 changed files with 113 additions and 56 deletions

10
tvix/Cargo.lock generated
View file

@ -3693,6 +3693,15 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "threadpool"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
dependencies = [
"num_cpus",
]
[[package]] [[package]]
name = "tikv-jemalloc-sys" name = "tikv-jemalloc-sys"
version = "0.5.4+5.3.0-patched" version = "0.5.4+5.3.0-patched"
@ -4230,6 +4239,7 @@ dependencies = [
"sled", "sled",
"tempfile", "tempfile",
"thiserror", "thiserror",
"threadpool",
"tokio", "tokio",
"tokio-retry", "tokio-retry",
"tokio-stream", "tokio-stream",

View file

@ -11293,6 +11293,24 @@ rec {
]; ];
features = { }; features = { };
}; };
"threadpool" = rec {
crateName = "threadpool";
version = "1.8.1";
edition = "2015";
sha256 = "1amgfyzvynbm8pacniivzq9r0fh3chhs7kijic81j76l6c5ycl6h";
authors = [
"The Rust Project Developers"
"Corey Farwell <coreyf@rwell.org>"
"Stefan Schindler <dns2utf8@estada.ch>"
];
dependencies = [
{
name = "num_cpus";
packageId = "num_cpus";
}
];
};
"tikv-jemalloc-sys" = rec { "tikv-jemalloc-sys" = rec {
crateName = "tikv-jemalloc-sys"; crateName = "tikv-jemalloc-sys";
version = "0.5.4+5.3.0-patched"; version = "0.5.4+5.3.0-patched";
@ -13319,6 +13337,11 @@ rec {
name = "thiserror"; name = "thiserror";
packageId = "thiserror"; packageId = "thiserror";
} }
{
name = "threadpool";
packageId = "threadpool";
optional = true;
}
{ {
name = "tokio"; name = "tokio";
packageId = "tokio"; packageId = "tokio";
@ -13445,7 +13468,7 @@ rec {
features = { features = {
"cloud" = [ "dep:bigtable_rs" "object_store/aws" "object_store/azure" "object_store/gcp" ]; "cloud" = [ "dep:bigtable_rs" "object_store/aws" "object_store/azure" "object_store/gcp" ];
"default" = [ "cloud" ]; "default" = [ "cloud" ];
"fs" = [ "dep:libc" "dep:fuse-backend-rs" ]; "fs" = [ "dep:fuse-backend-rs" "dep:threadpool" "dep:libc" ];
"fuse" = [ "fs" ]; "fuse" = [ "fs" ];
"tonic-reflection" = [ "dep:tonic-reflection" ]; "tonic-reflection" = [ "dep:tonic-reflection" ];
"virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ]; "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ];

View file

@ -51,6 +51,10 @@ version = "0.11.0"
optional = true optional = true
version = "0.2.144" version = "0.2.144"
[dependencies.threadpool]
version = "1.8.1"
optional = true
[dependencies.tonic-reflection] [dependencies.tonic-reflection]
optional = true optional = true
version = "0.11.0" version = "0.11.0"
@ -100,7 +104,7 @@ cloud = [
"object_store/azure", "object_store/azure",
"object_store/gcp", "object_store/gcp",
] ]
fs = ["dep:libc", "dep:fuse-backend-rs"] fs = ["dep:fuse-backend-rs", "dep:threadpool", "dep:libc"]
virtiofs = [ virtiofs = [
"fs", "fs",
"dep:vhost", "dep:vhost",

View file

@ -1,6 +1,8 @@
use std::{io, path::Path, sync::Arc, thread}; use std::{io, path::Path, sync::Arc};
use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
use parking_lot::Mutex;
use threadpool::ThreadPool;
use tracing::{error, instrument}; use tracing::{error, instrument};
#[cfg(test)] #[cfg(test)]
@ -49,9 +51,12 @@ where
} }
} }
/// Starts a [Filesystem] with the specified number of threads, and provides
/// functions to unmount, and wait for it to have completed.
#[derive(Clone)]
pub struct FuseDaemon { pub struct FuseDaemon {
session: FuseSession, session: Arc<Mutex<FuseSession>>,
threads: Vec<thread::JoinHandle<()>>, threads: Arc<ThreadPool>,
} }
impl FuseDaemon { impl FuseDaemon {
@ -59,7 +64,7 @@ impl FuseDaemon {
pub fn new<FS, P>( pub fn new<FS, P>(
fs: FS, fs: FS,
mountpoint: P, mountpoint: P,
threads: usize, num_threads: usize,
allow_other: bool, allow_other: bool,
) -> Result<Self, io::Error> ) -> Result<Self, io::Error>
where where
@ -76,40 +81,49 @@ impl FuseDaemon {
session session
.mount() .mount()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mut join_handles = Vec::with_capacity(threads);
for _ in 0..threads { // construct a thread pool
let threads = threadpool::Builder::new()
.num_threads(num_threads)
.thread_name("fuse_server".to_string())
.build();
for _ in 0..num_threads {
// for each thread requested, create and start a FuseServer accepting requests.
let mut server = FuseServer { let mut server = FuseServer {
server: server.clone(), server: server.clone(),
channel: session channel: session
.new_channel() .new_channel()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
}; };
let join_handle = thread::Builder::new()
.name("fuse_server".to_string()) threads.execute(move || {
.spawn(move || {
let _ = server.start(); let _ = server.start();
})?; });
join_handles.push(join_handle);
} }
Ok(FuseDaemon { Ok(FuseDaemon {
session, session: Arc::new(Mutex::new(session)),
threads: join_handles, threads: Arc::new(threads),
}) })
} }
/// Waits for all threads to finish.
#[instrument(skip_all)]
pub fn wait(&self) {
self.threads.join()
}
/// Send the unmount command, and waits for all threads to finish.
#[instrument(skip_all, err)] #[instrument(skip_all, err)]
pub fn unmount(&mut self) -> Result<(), io::Error> { pub fn unmount(&self) -> Result<(), io::Error> {
// Send the unmount command.
self.session self.session
.lock()
.umount() .umount()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
for thread in self.threads.drain(..) { self.wait();
thread.join().map_err(|_| {
io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
})?;
}
Ok(()) Ok(())
} }
} }

View file

@ -248,7 +248,7 @@ async fn mount() {
let (blob_service, directory_service) = gen_svcs(); let (blob_service, directory_service) = gen_svcs();
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
BTreeMap::default(), BTreeMap::default(),
@ -271,7 +271,7 @@ async fn root() {
let tmpdir = TempDir::new().unwrap(); let tmpdir = TempDir::new().unwrap();
let (blob_service, directory_service) = gen_svcs(); let (blob_service, directory_service) = gen_svcs();
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
BTreeMap::default(), BTreeMap::default(),
@ -305,7 +305,7 @@ async fn root_with_listing() {
populate_blob_a(&blob_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -349,7 +349,7 @@ async fn stat_file_at_root() {
populate_blob_a(&blob_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -386,7 +386,7 @@ async fn read_file_at_root() {
populate_blob_a(&blob_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -423,7 +423,7 @@ async fn read_large_file_at_root() {
populate_blob_b(&blob_service, &mut root_nodes).await; populate_blob_b(&blob_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -468,7 +468,7 @@ async fn symlink_readlink() {
populate_symlink(&mut root_nodes).await; populate_symlink(&mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -515,7 +515,7 @@ async fn read_stat_through_symlink() {
populate_blob_a(&blob_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await;
populate_symlink(&mut root_nodes).await; populate_symlink(&mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -560,7 +560,7 @@ async fn read_stat_directory() {
populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -597,7 +597,7 @@ async fn xattr() {
populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
populate_blob_a(&blob_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -680,7 +680,7 @@ async fn read_blob_inside_dir() {
populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -720,7 +720,7 @@ async fn read_blob_deep_inside_dir() {
populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -763,7 +763,7 @@ async fn readdir() {
populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -823,7 +823,7 @@ async fn readdir_deep() {
populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -873,7 +873,7 @@ async fn check_attributes() {
populate_symlink(&mut root_nodes).await; populate_symlink(&mut root_nodes).await;
populate_blob_helloworld(&blob_service, &mut root_nodes).await; populate_blob_helloworld(&blob_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -948,7 +948,7 @@ async fn compare_inodes_directories() {
populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -992,7 +992,7 @@ async fn compare_inodes_files() {
populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -1041,7 +1041,7 @@ async fn compare_inodes_symlinks() {
populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
populate_symlink2(&mut root_nodes).await; populate_symlink2(&mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -1084,7 +1084,7 @@ async fn read_wrong_paths_in_root() {
populate_blob_a(&blob_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -1139,7 +1139,7 @@ async fn disallow_writes() {
let (blob_service, directory_service) = gen_svcs(); let (blob_service, directory_service) = gen_svcs();
let root_nodes = BTreeMap::default(); let root_nodes = BTreeMap::default();
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -1171,7 +1171,7 @@ async fn missing_directory() {
populate_directorynode_without_directory(&mut root_nodes).await; populate_directorynode_without_directory(&mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,
@ -1219,7 +1219,7 @@ async fn missing_blob() {
populate_filenode_without_blob(&mut root_nodes).await; populate_filenode_without_blob(&mut root_nodes).await;
let mut fuse_daemon = do_mount( let fuse_daemon = do_mount(
blob_service, blob_service,
directory_service, directory_service,
root_nodes, root_nodes,

View file

@ -452,7 +452,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
) )
.await?; .await?;
let mut fuse_daemon = tokio::task::spawn_blocking(move || { let fuse_daemon = tokio::task::spawn_blocking(move || {
let fs = make_fs( let fs = make_fs(
blob_service, blob_service,
directory_service, directory_service,
@ -466,16 +466,22 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
}) })
.await??; .await??;
// grab a handle to unmount the file system, and register a signal // Wait for a ctrl_c and then call fuse_daemon.unmount().
// handler. tokio::spawn({
tokio::spawn(async move { let fuse_daemon = fuse_daemon.clone();
async move {
tokio::signal::ctrl_c().await.unwrap(); tokio::signal::ctrl_c().await.unwrap();
info!("interrupt received, unmounting…"); info!("interrupt received, unmounting…");
tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
info!("unmount occured, terminating…"); info!("unmount occured, terminating…");
Ok::<_, std::io::Error>(()) Ok::<_, std::io::Error>(())
}) }
.await??; });
// Wait for the server to finish, which can either happen through it
// being unmounted externally, or receiving a signal invoking the
// handler above.
tokio::task::spawn_blocking(move || fuse_daemon.wait()).await?
} }
#[cfg(feature = "virtiofs")] #[cfg(feature = "virtiofs")]
Commands::VirtioFs { Commands::VirtioFs {