feat(tvix/store): fix ctrl-c handling on mount command

This enables the tokio `signal` feature, and registers a ctrl_c signal
handler, which will use the unmount handle to unmount in case a ctrl-c
signal is received.

This avoids having disconnected mountpoints when Ctrl-C'ing a
`tvix-store mount` invocation.

In case the filesystem is unmounted externally (via `umount /path/to/
mountpoint`), the future is waiting for the signal is never resolved and
the task is stopped.

Change-Id: I149f705a6cb50188177f2a6c6a5fcd77218e2a3f
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9218
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-09-03 17:39:44 +03:00 committed by clbot
parent f9b5fc49b1
commit f499d2e031
4 changed files with 60 additions and 6 deletions

10
tvix/Cargo.lock generated
View file

@ -2116,6 +2116,15 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.8"
@ -2433,6 +2442,7 @@ dependencies = [
"mio",
"num_cpus",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",

View file

@ -6092,6 +6092,23 @@ rec {
"loom" = [ "dep:loom" ];
};
};
"signal-hook-registry" = rec {
crateName = "signal-hook-registry";
version = "1.4.1";
edition = "2015";
sha256 = "18crkkw5k82bvcx088xlf5g4n3772m24qhzgfan80nda7d3rn8nq";
authors = [
"Michal 'vorner' Vaner <vorner@vorner.cz>"
"Masaki Hara <ackie.h.gmai@gmail.com>"
];
dependencies = [
{
name = "libc";
packageId = "libc";
}
];
};
"slab" = rec {
crateName = "slab";
version = "0.4.8";
@ -6924,6 +6941,12 @@ rec {
name = "pin-project-lite";
packageId = "pin-project-lite";
}
{
name = "signal-hook-registry";
packageId = "signal-hook-registry";
optional = true;
target = { target, features }: (target."unix" or false);
}
{
name = "socket2";
packageId = "socket2";
@ -6987,7 +7010,7 @@ rec {
"tracing" = [ "dep:tracing" ];
"windows-sys" = [ "dep:windows-sys" ];
};
resolvedDefaultFeatures = [ "bytes" "default" "io-std" "io-util" "libc" "macros" "mio" "net" "num_cpus" "rt" "rt-multi-thread" "socket2" "sync" "time" "tokio-macros" "windows-sys" ];
resolvedDefaultFeatures = [ "bytes" "default" "io-std" "io-util" "libc" "macros" "mio" "net" "num_cpus" "rt" "rt-multi-thread" "signal" "signal-hook-registry" "socket2" "sync" "time" "tokio-macros" "windows-sys" ];
};
"tokio-io-timeout" = rec {
crateName = "tokio-io-timeout";
@ -8363,7 +8386,7 @@ rec {
{
name = "tokio";
packageId = "tokio";
features = [ "rt-multi-thread" "net" ];
features = [ "net" "rt-multi-thread" "signal" ];
}
{
name = "tokio-stream";

View file

@ -17,7 +17,7 @@ sha2 = "0.10.6"
sled = { version = "0.34.7", features = ["compression"] }
thiserror = "1.0.38"
tokio-stream = "0.1.14"
tokio = { version = "1.28.0", features = ["rt-multi-thread", "net"] }
tokio = { version = "1.28.0", features = ["net", "rt-multi-thread", "signal"] }
tonic = "0.8.2"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["json"] }

View file

@ -264,16 +264,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
directory_service.clone(),
)?;
tokio::task::spawn_blocking(move || {
let mut fuse_session = tokio::task::spawn_blocking(move || {
let f = FUSE::new(
blob_service,
directory_service,
path_info_service,
list_root,
);
fuser::mount2(f, &dest, &[])
fuser::Session::new(f, &dest, &[])
})
.await??
.await??;
// grab a handle to unmount the file system, and register a signal
// handler.
let mut fuse_unmounter = fuse_session.unmount_callable();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("interrupt received, unmounting…");
fuse_unmounter.unmount().unwrap();
});
// Start the fuse filesystem and wait for its completion, which
// happens when it's unmounted externally, or via the signal handler
// task.
tokio::task::spawn_blocking(move || -> io::Result<()> {
info!("mounting tvix-store on {:?}", fuse_session.mountpoint());
let res = fuse_session.run()?;
info!("unmount occured, terminating…");
Ok(res)
})
.await??;
}
};
Ok(())