refactor(tvix/store/fs): Separate FUSE and filesystem code

In prepration for adding virtiofs support, I thought it would make sense
to split out the filesystem implementation from FUSE itself.

The `fs` module holds the tvix-store filesystem implemetation and the
`fuse` module holds the code to spawn a FUSE daemon backed by multiple
threads.

Change-Id: I8c58447b8c3aa016a613068f8e7ec166554e237c
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9343
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Autosubmit: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Connor Brewster 2023-09-16 13:54:10 -05:00 committed by clbot
parent 6b7c936bc5
commit 7e737fde34
10 changed files with 141 additions and 133 deletions

View file

@ -8859,11 +8859,12 @@ rec {
]; ];
features = { features = {
"default" = [ "fuse" "reflection" ]; "default" = [ "fuse" "reflection" ];
"fuse" = [ "dep:libc" "dep:fuse-backend-rs" ]; "fs" = [ "dep:libc" "dep:fuse-backend-rs" ];
"fuse" = [ "fs" ];
"reflection" = [ "tonic-reflection" ]; "reflection" = [ "tonic-reflection" ];
"tonic-reflection" = [ "dep:tonic-reflection" ]; "tonic-reflection" = [ "dep:tonic-reflection" ];
}; };
resolvedDefaultFeatures = [ "default" "fuse" "reflection" "tonic-reflection" ]; resolvedDefaultFeatures = [ "default" "fs" "fuse" "reflection" "tonic-reflection" ];
}; };
"typenum" = rec { "typenum" = rec {
crateName = "typenum"; crateName = "typenum";

View file

@ -58,5 +58,6 @@ tonic-mock = { git = "https://github.com/brainrake/tonic-mock", branch = "bump-d
[features] [features]
default = ["fuse", "reflection"] default = ["fuse", "reflection"]
fuse = ["dep:libc", "dep:fuse-backend-rs"] fs = ["dep:libc", "dep:fuse-backend-rs"]
fuse = ["fs"]
reflection = ["tonic-reflection"] reflection = ["tonic-reflection"]

View file

@ -22,7 +22,12 @@ use tvix_store::proto::GRPCPathInfoServiceWrapper;
use tvix_store::proto::NamedNode; use tvix_store::proto::NamedNode;
use tvix_store::proto::NarInfo; use tvix_store::proto::NarInfo;
use tvix_store::proto::PathInfo; use tvix_store::proto::PathInfo;
use tvix_store::{FuseDaemon, FUSE};
#[cfg(feature = "fs")]
use tvix_store::fs::TvixStoreFs;
#[cfg(feature = "fuse")]
use tvix_store::fs::fuse::FuseDaemon;
#[cfg(feature = "reflection")] #[cfg(feature = "reflection")]
use tvix_store::proto::FILE_DESCRIPTOR_SET; use tvix_store::proto::FILE_DESCRIPTOR_SET;
@ -302,7 +307,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)?; )?;
let mut fuse_daemon = tokio::task::spawn_blocking(move || { let mut fuse_daemon = tokio::task::spawn_blocking(move || {
let f = FUSE::new( let f = TvixStoreFs::new(
blob_service, blob_service,
directory_service, directory_service,
path_info_service, path_info_service,

113
tvix/store/src/fs/fuse.rs Normal file
View file

@ -0,0 +1,113 @@
use std::{io, path::Path, sync::Arc, thread};
use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
use tracing::error;
struct FuseServer<FS>
where
FS: FileSystem + Sync + Send,
{
server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
channel: fuse_backend_rs::transport::FuseChannel,
}
impl<FS> FuseServer<FS>
where
FS: FileSystem + Sync + Send,
{
fn start(&mut self) -> io::Result<()> {
loop {
if let Some((reader, writer)) = self
.channel
.get_request()
.map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
{
if let Err(e) = self
.server
.handle_message(reader, writer.into(), None, None)
{
match e {
// This indicates the session has been shut down.
fuse_backend_rs::Error::EncodeMessage(e)
if e.raw_os_error() == Some(libc::EBADFD) =>
{
break;
}
error => {
error!(?error, "failed to handle fuse request");
continue;
}
}
}
} else {
break;
}
}
Ok(())
}
}
pub struct FuseDaemon {
session: FuseSession,
threads: Vec<thread::JoinHandle<()>>,
}
impl FuseDaemon {
pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
where
FS: FileSystem + Sync + Send + 'static,
P: AsRef<Path>,
{
let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
session.set_allow_other(false);
session
.mount()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mut join_handles = Vec::with_capacity(threads);
for _ in 0..threads {
let mut server = FuseServer {
server: server.clone(),
channel: session
.new_channel()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
};
let join_handle = thread::Builder::new()
.name("fuse_server".to_string())
.spawn(move || {
let _ = server.start();
})?;
join_handles.push(join_handle);
}
Ok(FuseDaemon {
session,
threads: join_handles,
})
}
pub fn unmount(&mut self) -> Result<(), io::Error> {
self.session
.umount()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
for thread in self.threads.drain(..) {
thread.join().map_err(|_| {
io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
})?;
}
Ok(())
}
}
impl Drop for FuseDaemon {
fn drop(&mut self) {
if let Err(error) = self.unmount() {
error!(?error, "failed to unmont fuse filesystem")
}
}
}

View file

@ -197,7 +197,7 @@ impl InodeTracker {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::fuse::inodes::DirectoryInodeData; use crate::fs::inodes::DirectoryInodeData;
use crate::proto; use crate::proto;
use crate::tests::fixtures; use crate::tests::fixtures;

View file

@ -2,39 +2,37 @@ mod file_attr;
mod inode_tracker; mod inode_tracker;
mod inodes; mod inodes;
#[cfg(feature = "fuse")]
pub mod fuse;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use crate::{ use crate::{
blobservice::{BlobReader, BlobService}, blobservice::{BlobReader, BlobService},
directoryservice::DirectoryService, directoryservice::DirectoryService,
fuse::inodes::{DirectoryInodeData, InodeData},
pathinfoservice::PathInfoService, pathinfoservice::PathInfoService,
proto::{node::Node, NamedNode}, proto::{node::Node, NamedNode},
B3Digest, Error, B3Digest, Error,
}; };
use fuse_backend_rs::{ use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID},
transport::FuseSession,
};
use nix_compat::store_path::StorePath; use nix_compat::store_path::StorePath;
use parking_lot::RwLock; use parking_lot::RwLock;
use std::{ use std::{
collections::HashMap, collections::HashMap,
io, io,
path::Path,
str::FromStr, str::FromStr,
sync::atomic::AtomicU64, sync::atomic::AtomicU64,
sync::{atomic::Ordering, Arc}, sync::{atomic::Ordering, Arc},
thread,
time::Duration, time::Duration,
}; };
use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
use tracing::{debug, error, info_span, warn}; use tracing::{debug, info_span, warn};
use self::{ use self::{
file_attr::{gen_file_attr, ROOT_FILE_ATTR}, file_attr::{gen_file_attr, ROOT_FILE_ATTR},
inode_tracker::InodeTracker, inode_tracker::InodeTracker,
inodes::{DirectoryInodeData, InodeData},
}; };
/// This implements a read-only FUSE filesystem for a tvix-store /// This implements a read-only FUSE filesystem for a tvix-store
@ -71,7 +69,7 @@ use self::{
/// Due to the above being valid across the whole store, and considering the /// Due to the above being valid across the whole store, and considering the
/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed /// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
/// allocation", aka reserve Directory.size inodes for each PathInfo. /// allocation", aka reserve Directory.size inodes for each PathInfo.
pub struct FUSE { pub struct TvixStoreFs {
blob_service: Arc<dyn BlobService>, blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>, directory_service: Arc<dyn DirectoryService>,
path_info_service: Arc<dyn PathInfoService>, path_info_service: Arc<dyn PathInfoService>,
@ -93,7 +91,7 @@ pub struct FUSE {
tokio_handle: tokio::runtime::Handle, tokio_handle: tokio::runtime::Handle,
} }
impl FUSE { impl TvixStoreFs {
pub fn new( pub fn new(
blob_service: Arc<dyn BlobService>, blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>, directory_service: Arc<dyn DirectoryService>,
@ -224,7 +222,7 @@ impl FUSE {
} }
} }
impl FileSystem for FUSE { impl FileSystem for TvixStoreFs {
type Inode = u64; type Inode = u64;
type Handle = u64; type Handle = u64;
@ -660,112 +658,3 @@ impl FileSystem for FUSE {
} }
} }
} }
struct FuseServer<FS>
where
FS: FileSystem + Sync + Send,
{
server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
channel: fuse_backend_rs::transport::FuseChannel,
}
impl<FS> FuseServer<FS>
where
FS: FileSystem + Sync + Send,
{
fn start(&mut self) -> io::Result<()> {
loop {
if let Some((reader, writer)) = self
.channel
.get_request()
.map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
{
if let Err(e) = self
.server
.handle_message(reader, writer.into(), None, None)
{
match e {
// This indicates the session has been shut down.
fuse_backend_rs::Error::EncodeMessage(e)
if e.raw_os_error() == Some(libc::EBADFD) =>
{
break;
}
error => {
error!(?error, "failed to handle fuse request");
continue;
}
}
}
} else {
break;
}
}
Ok(())
}
}
pub struct FuseDaemon {
session: FuseSession,
threads: Vec<thread::JoinHandle<()>>,
}
impl FuseDaemon {
pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
where
FS: FileSystem + Sync + Send + 'static,
P: AsRef<Path>,
{
let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
session.set_allow_other(false);
session
.mount()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mut join_handles = Vec::with_capacity(threads);
for _ in 0..threads {
let mut server = FuseServer {
server: server.clone(),
channel: session
.new_channel()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
};
let join_handle = thread::Builder::new()
.name("fuse_server".to_string())
.spawn(move || {
let _ = server.start();
})?;
join_handles.push(join_handle);
}
Ok(FuseDaemon {
session,
threads: join_handles,
})
}
pub fn unmount(&mut self) -> Result<(), io::Error> {
self.session
.umount()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
for thread in self.threads.drain(..) {
thread.join().map_err(|_| {
io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
})?;
}
Ok(())
}
}
impl Drop for FuseDaemon {
fn drop(&mut self) {
if let Err(error) = self.unmount() {
error!(?error, "failed to unmont fuse filesystem")
}
}
}

View file

@ -8,11 +8,12 @@ use tempfile::TempDir;
use crate::blobservice::BlobService; use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryService; use crate::directoryservice::DirectoryService;
use crate::fs::{fuse::FuseDaemon, TvixStoreFs};
use crate::pathinfoservice::PathInfoService; use crate::pathinfoservice::PathInfoService;
use crate::proto;
use crate::proto::{DirectoryNode, FileNode, PathInfo}; use crate::proto::{DirectoryNode, FileNode, PathInfo};
use crate::tests::fixtures; use crate::tests::fixtures;
use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service};
use crate::{proto, FuseDaemon, FUSE};
const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; const BLOB_A_NAME: &str = "00000000000000000000000000000000-test";
const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; const BLOB_B_NAME: &str = "55555555555555555555555555555555-test";
@ -41,7 +42,7 @@ fn do_mount<P: AsRef<Path>>(
mountpoint: P, mountpoint: P,
list_root: bool, list_root: bool,
) -> io::Result<FuseDaemon> { ) -> io::Result<FuseDaemon> {
let fs = FUSE::new( let fs = TvixStoreFs::new(
blob_service, blob_service,
directory_service, directory_service,
path_info_service, path_info_service,

View file

@ -1,7 +1,8 @@
mod digests; mod digests;
mod errors; mod errors;
#[cfg(feature = "fuse")]
mod fuse; #[cfg(feature = "fs")]
pub mod fs;
pub mod blobservice; pub mod blobservice;
pub mod directoryservice; pub mod directoryservice;
@ -13,8 +14,5 @@ pub mod proto;
pub use digests::B3Digest; pub use digests::B3Digest;
pub use errors::Error; pub use errors::Error;
#[cfg(feature = "fuse")]
pub use fuse::{FuseDaemon, FUSE};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;