refactor(tvix/store/fs): Decouple TvixStoreFs from Nix concepts
To support tvix builds, we need to be able to use the `TvixStoreFs` to materialize the sandbox's /nix/store filesystem with just the list of inputs needed for the build. Currently we'd need to seed an in-memory `PathInfoService`, which includes more functionality than what is required for `TvixStoreFs`. Additionally, the `PathInfoService` is specific to Nix. By decoupling `TvixStoreFs` and `PathInfoService`, we allow for usage of `TvixStoreFs` with `tvix-castore` without needing a `PathInfoService`. This introduces a new `RootNodes` trait which provides a way for the filesystem to look up CA nodes via their basename in the root directory of the filesystem. We then implement `RootNodes` for any `PathInfoService`. Additionally, the filesystem root inode tracker now stores basenames rather than `StorePath`s since `StorePath`s are specific to Nix. As a followup we can rename `TvixStoreFs` to `TvixCaStoreFs` and move it to the `castore` crate (or its own crate). b/341 Change-Id: I928372955017c23b1bf2b37190cbc508a4ed10d5 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10363 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
parent
91b6c13f90
commit
6815572274
2 changed files with 119 additions and 76 deletions
|
@ -1,6 +1,7 @@
|
|||
mod file_attr;
|
||||
mod inode_tracker;
|
||||
mod inodes;
|
||||
mod root_nodes;
|
||||
|
||||
#[cfg(feature = "fuse")]
|
||||
pub mod fuse;
|
||||
|
@ -11,12 +12,9 @@ pub mod virtiofs;
|
|||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use crate::pathinfoservice::PathInfoService;
|
||||
|
||||
use fuse_backend_rs::abi::fuse_abi::stat64;
|
||||
use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
|
||||
use futures::StreamExt;
|
||||
use nix_compat::store_path::StorePath;
|
||||
use parking_lot::RwLock;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
|
@ -38,6 +36,7 @@ use tvix_castore::{
|
|||
B3Digest,
|
||||
};
|
||||
|
||||
use self::root_nodes::RootNodes;
|
||||
use self::{
|
||||
file_attr::{gen_file_attr, ROOT_FILE_ATTR},
|
||||
inode_tracker::InodeTracker,
|
||||
|
@ -45,12 +44,7 @@ use self::{
|
|||
};
|
||||
|
||||
/// This implements a read-only FUSE filesystem for a tvix-store
|
||||
/// with the passed [BlobService], [DirectoryService] and [PathInfoService].
|
||||
///
|
||||
/// We don't allow listing on the root mountpoint (inode 0).
|
||||
/// In the future, this might be made configurable once a listing method is
|
||||
/// added to [self.path_info_service], and then show all store paths in that
|
||||
/// store.
|
||||
/// with the passed [BlobService], [DirectoryService] and [RootNodes].
|
||||
///
|
||||
/// Linux uses inodes in filesystems. When implementing FUSE, most calls are
|
||||
/// *for* a given inode.
|
||||
|
@ -59,7 +53,7 @@ use self::{
|
|||
/// corresponding store nodes.
|
||||
///
|
||||
/// We internally delegate all inode allocation and state keeping to the
|
||||
/// inode tracker, and store the currently "explored" store paths together with
|
||||
/// inode tracker, and store the currently "explored" root nodes together with
|
||||
/// root inode of the root.
|
||||
///
|
||||
/// There's some places where inodes are allocated / data inserted into
|
||||
|
@ -78,16 +72,16 @@ use self::{
|
|||
/// Due to the above being valid across the whole store, and considering the
|
||||
/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
|
||||
/// allocation", aka reserve Directory.size inodes for each PathInfo.
|
||||
pub struct TvixStoreFs {
|
||||
pub struct TvixStoreFs<RN> {
|
||||
blob_service: Arc<dyn BlobService>,
|
||||
directory_service: Arc<dyn DirectoryService>,
|
||||
path_info_service: Arc<dyn PathInfoService>,
|
||||
root_nodes_provider: RN,
|
||||
|
||||
/// Whether to (try) listing elements in the root.
|
||||
list_root: bool,
|
||||
|
||||
/// This maps a given StorePath to the inode we allocated for the root inode.
|
||||
store_paths: RwLock<HashMap<StorePath, u64>>,
|
||||
root_nodes: RwLock<HashMap<Vec<u8>, u64>>,
|
||||
|
||||
/// This keeps track of inodes and data alongside them.
|
||||
inode_tracker: RwLock<InodeTracker>,
|
||||
|
@ -101,21 +95,24 @@ pub struct TvixStoreFs {
|
|||
tokio_handle: tokio::runtime::Handle,
|
||||
}
|
||||
|
||||
impl TvixStoreFs {
|
||||
impl<RN> TvixStoreFs<RN>
|
||||
where
|
||||
RN: RootNodes + Clone + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
blob_service: Arc<dyn BlobService>,
|
||||
directory_service: Arc<dyn DirectoryService>,
|
||||
path_info_service: Arc<dyn PathInfoService>,
|
||||
root_nodes_provider: RN,
|
||||
list_root: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
blob_service,
|
||||
directory_service,
|
||||
path_info_service,
|
||||
root_nodes_provider,
|
||||
|
||||
list_root,
|
||||
|
||||
store_paths: RwLock::new(HashMap::default()),
|
||||
root_nodes: RwLock::new(HashMap::default()),
|
||||
inode_tracker: RwLock::new(Default::default()),
|
||||
|
||||
file_handles: RwLock::new(Default::default()),
|
||||
|
@ -124,11 +121,10 @@ impl TvixStoreFs {
|
|||
}
|
||||
}
|
||||
|
||||
/// Retrieves the inode for a given StorePath, if present.
|
||||
/// This obtains a read lock on self.store_paths.
|
||||
fn get_inode_for_store_path(&self, store_path: &StorePath) -> Option<u64> {
|
||||
let store_paths = self.store_paths.read();
|
||||
store_paths.get(store_path).cloned()
|
||||
/// Retrieves the inode for a given root node basename, if present.
|
||||
/// This obtains a read lock on self.root_nodes.
|
||||
fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> {
|
||||
self.root_nodes.read().get(name).cloned()
|
||||
}
|
||||
|
||||
/// For a given inode, look up the given directory behind it (from
|
||||
|
@ -200,27 +196,19 @@ impl TvixStoreFs {
|
|||
|
||||
/// This will turn a lookup request for a name in the root to a ino and
|
||||
/// [InodeData].
|
||||
/// It will peek in [self.store_paths], and then either look it up from
|
||||
/// It will peek in [self.root_nodes], and then either look it up from
|
||||
/// [self.inode_tracker],
|
||||
/// or otherwise fetch from [self.path_info_service], and then insert into
|
||||
/// or otherwise fetch from [self.root_nodes], and then insert into
|
||||
/// [self.inode_tracker].
|
||||
/// In the case the name can't be found, a libc::ENOENT is returned.
|
||||
fn name_in_root_to_ino_and_data(
|
||||
&self,
|
||||
name: &std::ffi::CStr,
|
||||
) -> io::Result<(u64, Arc<InodeData>)> {
|
||||
// parse the name into a [StorePath].
|
||||
let store_path = StorePath::from_bytes(name.to_bytes()).map_err(|e| {
|
||||
debug!(e=?e, "unable to parse as store path");
|
||||
// This is not an error, but a "ENOENT", as someone can stat
|
||||
// a file inside the root that's no valid store path
|
||||
io::Error::from_raw_os_error(libc::ENOENT)
|
||||
})?;
|
||||
|
||||
// Look up the inode for that store path.
|
||||
// Look up the inode for that root node.
|
||||
// If there's one, [self.inode_tracker] MUST also contain the data,
|
||||
// which we can then return.
|
||||
if let Some(inode) = self.get_inode_for_store_path(&store_path) {
|
||||
if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) {
|
||||
return Ok((
|
||||
inode,
|
||||
self.inode_tracker
|
||||
|
@ -231,46 +219,42 @@ impl TvixStoreFs {
|
|||
));
|
||||
}
|
||||
|
||||
// We don't have it yet, look it up in [self.path_info_service].
|
||||
// We don't have it yet, look it up in [self.root_nodes].
|
||||
match self.tokio_handle.block_on({
|
||||
let path_info_service = self.path_info_service.clone();
|
||||
let digest = *store_path.digest();
|
||||
async move { path_info_service.get(digest).await }
|
||||
let root_nodes_provider = self.root_nodes_provider.clone();
|
||||
async move { root_nodes_provider.get_by_basename(name.to_bytes()).await }
|
||||
}) {
|
||||
// if there was an error looking up the path_info, propagate up an IO error.
|
||||
// if there was an error looking up the root node, propagate up an IO error.
|
||||
Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
|
||||
// the pathinfo doesn't exist, so the file doesn't exist.
|
||||
// the root node doesn't exist, so the file doesn't exist.
|
||||
Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
|
||||
// The pathinfo does exist
|
||||
Ok(Some(path_info)) => {
|
||||
// There must be a root node (ensured by the validation happening inside clients)
|
||||
let root_node = path_info.node.unwrap().node.unwrap();
|
||||
|
||||
// The root node does exist
|
||||
Ok(Some(root_node)) => {
|
||||
// The name must match what's passed in the lookup, otherwise this is also a ENOENT.
|
||||
if root_node.get_name() != store_path.to_string().as_bytes() {
|
||||
debug!(root_node.name=?root_node.get_name(), store_path.name=%store_path.to_string(), "store path mismatch");
|
||||
if root_node.get_name() != name.to_bytes() {
|
||||
debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch");
|
||||
return Err(io::Error::from_raw_os_error(libc::ENOENT));
|
||||
}
|
||||
|
||||
// Let's check if someone else beat us to updating the inode tracker and
|
||||
// store_paths map. This avoids locking inode_tracker for writing.
|
||||
if let Some(ino) = self.store_paths.read().get(&store_path) {
|
||||
// root_nodes map. This avoids locking inode_tracker for writing.
|
||||
if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) {
|
||||
return Ok((
|
||||
*ino,
|
||||
self.inode_tracker.read().get(*ino).expect("must exist"),
|
||||
));
|
||||
}
|
||||
|
||||
// Only in case it doesn't, lock [self.store_paths] and
|
||||
// Only in case it doesn't, lock [self.root_nodes] and
|
||||
// [self.inode_tracker] for writing.
|
||||
let mut store_paths = self.store_paths.write();
|
||||
let mut root_nodes = self.root_nodes.write();
|
||||
let mut inode_tracker = self.inode_tracker.write();
|
||||
|
||||
// insert the (sparse) inode data and register in
|
||||
// self.store_paths.
|
||||
// self.root_nodes.
|
||||
let inode_data: InodeData = (&root_node).into();
|
||||
let ino = inode_tracker.put(inode_data.clone());
|
||||
store_paths.insert(store_path, ino);
|
||||
root_nodes.insert(name.to_bytes().into(), ino);
|
||||
|
||||
Ok((ino, Arc::new(inode_data)))
|
||||
}
|
||||
|
@ -278,7 +262,10 @@ impl TvixStoreFs {
|
|||
}
|
||||
}
|
||||
|
||||
impl FileSystem for TvixStoreFs {
|
||||
impl<RN> FileSystem for TvixStoreFs<RN>
|
||||
where
|
||||
RN: RootNodes + Clone + 'static,
|
||||
{
|
||||
type Handle = u64;
|
||||
type Inode = u64;
|
||||
|
||||
|
@ -317,7 +304,7 @@ impl FileSystem for TvixStoreFs {
|
|||
|
||||
// This goes from a parent inode to a node.
|
||||
// - If the parent is [ROOT_ID], we need to check
|
||||
// [self.store_paths] (fetching from PathInfoService if needed)
|
||||
// [self.root_nodes] (fetching from a [RootNode] provider if needed)
|
||||
// - Otherwise, lookup the parent in [self.inode_tracker] (which must be
|
||||
// a [InodeData::Directory]), and find the child with that name.
|
||||
if parent == ROOT_ID {
|
||||
|
@ -380,15 +367,15 @@ impl FileSystem for TvixStoreFs {
|
|||
if !self.list_root {
|
||||
return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
|
||||
} else {
|
||||
let path_info_service = self.path_info_service.clone();
|
||||
let root_nodes_provider = self.root_nodes_provider.clone();
|
||||
let (tx, mut rx) = mpsc::channel(16);
|
||||
|
||||
// This task will run in the background immediately and will exit
|
||||
// after the stream ends or if we no longer want any more entries.
|
||||
self.tokio_handle.spawn(async move {
|
||||
let mut stream = path_info_service.list().skip(offset as usize).enumerate();
|
||||
while let Some(path_info) = stream.next().await {
|
||||
if tx.send(path_info).await.is_err() {
|
||||
let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate();
|
||||
while let Some(node) = stream.next().await {
|
||||
if tx.send(node).await.is_err() {
|
||||
// If we get a send error, it means the sync code
|
||||
// doesn't want any more entries.
|
||||
break;
|
||||
|
@ -396,29 +383,24 @@ impl FileSystem for TvixStoreFs {
|
|||
}
|
||||
});
|
||||
|
||||
while let Some((i, path_info)) = rx.blocking_recv() {
|
||||
let path_info = match path_info {
|
||||
while let Some((i, root_node)) = rx.blocking_recv() {
|
||||
let root_node = match root_node {
|
||||
Err(e) => {
|
||||
warn!("failed to retrieve pathinfo: {}", e);
|
||||
return Err(io::Error::from_raw_os_error(libc::EPERM));
|
||||
}
|
||||
Ok(path_info) => path_info,
|
||||
Ok(root_node) => root_node,
|
||||
};
|
||||
|
||||
// We know the root node exists and the store_path can be parsed because clients MUST validate.
|
||||
let root_node = path_info.node.unwrap().node.unwrap();
|
||||
let store_path = StorePath::from_bytes(root_node.get_name()).unwrap();
|
||||
|
||||
let name = root_node.get_name();
|
||||
// obtain the inode, or allocate a new one.
|
||||
let ino = self
|
||||
.get_inode_for_store_path(&store_path)
|
||||
.unwrap_or_else(|| {
|
||||
// insert the (sparse) inode data and register in
|
||||
// self.store_paths.
|
||||
let ino = self.inode_tracker.write().put((&root_node).into());
|
||||
self.store_paths.write().insert(store_path.clone(), ino);
|
||||
ino
|
||||
});
|
||||
let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| {
|
||||
// insert the (sparse) inode data and register in
|
||||
// self.root_nodes.
|
||||
let ino = self.inode_tracker.write().put((&root_node).into());
|
||||
self.root_nodes.write().insert(name.into(), ino);
|
||||
ino
|
||||
});
|
||||
|
||||
let ty = match root_node {
|
||||
Node::Directory(_) => libc::S_IFDIR,
|
||||
|
@ -430,7 +412,7 @@ impl FileSystem for TvixStoreFs {
|
|||
ino,
|
||||
offset: offset + i as u64 + 1,
|
||||
type_: ty,
|
||||
name: store_path.to_string().as_bytes(),
|
||||
name,
|
||||
})?;
|
||||
// If the buffer is full, add_entry will return `Ok(0)`.
|
||||
if written == 0 {
|
||||
|
|
61
tvix/store/src/fs/root_nodes.rs
Normal file
61
tvix/store/src/fs/root_nodes.rs
Normal file
|
@ -0,0 +1,61 @@
|
|||
use std::{ops::Deref, pin::Pin};
|
||||
|
||||
use futures::{Stream, StreamExt};
|
||||
use nix_compat::store_path::StorePath;
|
||||
use tonic::async_trait;
|
||||
use tvix_castore::{proto::node::Node, Error};
|
||||
|
||||
use crate::pathinfoservice::PathInfoService;
|
||||
|
||||
/// Provides an interface for looking up root nodes in tvix-castore by given
|
||||
/// a lookup key (usually the basename), and optionally allow a listing.
|
||||
///
|
||||
#[async_trait]
|
||||
pub trait RootNodes: Send + Sync {
|
||||
/// Looks up a root CA node based on the basename of the node in the root
|
||||
/// directory of the filesystem.
|
||||
async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>;
|
||||
|
||||
/// Lists all root CA nodes in the filesystem. An error can be returned
|
||||
/// in case listing is not allowed
|
||||
fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send>>;
|
||||
}
|
||||
|
||||
/// Implements root node lookup for any [PathInfoService]. This represents a flat
|
||||
/// directory structure like /nix/store where each entry in the root filesystem
|
||||
/// directory corresponds to a CA node.
|
||||
#[async_trait]
|
||||
impl<T> RootNodes for T
|
||||
where
|
||||
T: Deref<Target = dyn PathInfoService> + Send + Sync,
|
||||
{
|
||||
async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> {
|
||||
let Ok(store_path) = StorePath::from_bytes(name) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
Ok(self
|
||||
.deref()
|
||||
.get(*store_path.digest())
|
||||
.await?
|
||||
.map(|path_info| {
|
||||
path_info
|
||||
.node
|
||||
.expect("missing root node")
|
||||
.node
|
||||
.expect("empty node")
|
||||
}))
|
||||
}
|
||||
|
||||
fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send>> {
|
||||
Box::pin(self.deref().list().map(|result| {
|
||||
result.map(|path_info| {
|
||||
path_info
|
||||
.node
|
||||
.expect("missing root node")
|
||||
.node
|
||||
.expect("empty node")
|
||||
})
|
||||
}))
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue