chore(tvix): move store/fs to castore/fs
With the recent introduction of the RootNodes trait, there's nothing in the fs module pulling in tvix-store dependencies, so it can live in tvix-castore. This allows other crates to make use of TvixStoreFS, without having to pull in tvix-store. For example, a tvix-build using a fuse mountpoint at /nix/store doesn't need a PathInfoService to hold the root nodes that should be present, but just a list. tvix-store now has a pathinfoservice/fs module, which contains the necessary glue logic to implement the RootNodes trait for a PathInfoService. To satisfy Rust orphan rules for trait implementations, we had to add a small wrapper struct. It's mostly hidden away by the make_fs helper function returning a TvixStoreFs. It can't be entirely private, as its still leaking into the concrete type of TvixStoreFS. tvix-store still has `fuse` and `virtiofs` features, but they now simply enable these features in the `tvix-castore` crate they depend on. The tests for the fuse functionality stay in tvix-store for now, as they populate the root nodes through a PathInfoService. Once above mentioned "list of root nodes" implementation exists, we might want to shuffle this around one more time. Fixes b/341. Change-Id: I989f664827a5a361b23b34368d242d10c157c756 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10378 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: sterni <sternenseemann@systemli.org>
This commit is contained in:
parent
52cad86195
commit
a5167c508c
18 changed files with 257 additions and 199 deletions
|
@ -10,6 +10,7 @@ bytes = "1.4.0"
|
|||
data-encoding = "2.3.3"
|
||||
futures = "0.3.28"
|
||||
lazy_static = "1.4.0"
|
||||
parking_lot = "0.12.1"
|
||||
pin-project-lite = "0.2.13"
|
||||
prost = "0.12.1"
|
||||
sled = { version = "0.34.7" }
|
||||
|
@ -25,10 +26,42 @@ walkdir = "2.4.0"
|
|||
bstr = "1.6.0"
|
||||
async-tempfile = "0.4.0"
|
||||
|
||||
[dependencies.fuse-backend-rs]
|
||||
optional = true
|
||||
version = "0.11.0"
|
||||
|
||||
[dependencies.libc]
|
||||
optional = true
|
||||
version = "0.2.144"
|
||||
|
||||
[dependencies.tonic-reflection]
|
||||
optional = true
|
||||
version = "0.10.2"
|
||||
|
||||
[dependencies.vhost]
|
||||
optional = true
|
||||
version = "0.6"
|
||||
|
||||
[dependencies.vhost-user-backend]
|
||||
optional = true
|
||||
version = "0.8"
|
||||
|
||||
[dependencies.virtio-queue]
|
||||
optional = true
|
||||
version = "0.7"
|
||||
|
||||
[dependencies.vm-memory]
|
||||
optional = true
|
||||
version = "0.10"
|
||||
|
||||
[dependencies.vmm-sys-util]
|
||||
optional = true
|
||||
version = "0.11"
|
||||
|
||||
[dependencies.virtio-bindings]
|
||||
optional = true
|
||||
version = "0.2.1"
|
||||
|
||||
[build-dependencies]
|
||||
prost-build = "0.12.1"
|
||||
tonic-build = "0.10.2"
|
||||
|
@ -41,4 +74,17 @@ hex-literal = "0.4.1"
|
|||
|
||||
[features]
|
||||
default = []
|
||||
fs = ["dep:libc", "dep:fuse-backend-rs"]
|
||||
virtiofs = [
|
||||
"fs",
|
||||
"dep:vhost",
|
||||
"dep:vhost-user-backend",
|
||||
"dep:virtio-queue",
|
||||
"dep:vm-memory",
|
||||
"dep:vmm-sys-util",
|
||||
"dep:virtio-bindings",
|
||||
"fuse-backend-rs?/vhost-user-fs", # impl FsCacheReqHandler for SlaveFsCacheReq
|
||||
"fuse-backend-rs?/virtiofs",
|
||||
]
|
||||
fuse = ["fs"]
|
||||
tonic-reflection = ["dep:tonic-reflection"]
|
||||
|
|
53
tvix/castore/src/fs/file_attr.rs
Normal file
53
tvix/castore/src/fs/file_attr.rs
Normal file
|
@ -0,0 +1,53 @@
|
|||
#![allow(clippy::unnecessary_cast)] // libc::S_IFDIR is u32 on Linux and u16 on MacOS
|
||||
use super::inodes::{DirectoryInodeData, InodeData};
|
||||
use fuse_backend_rs::abi::fuse_abi::Attr;
|
||||
|
||||
/// The [Attr] describing the root
|
||||
pub const ROOT_FILE_ATTR: Attr = Attr {
|
||||
ino: fuse_backend_rs::api::filesystem::ROOT_ID,
|
||||
size: 0,
|
||||
blksize: 1024,
|
||||
blocks: 0,
|
||||
mode: libc::S_IFDIR as u32 | 0o555,
|
||||
atime: 0,
|
||||
mtime: 0,
|
||||
ctime: 0,
|
||||
atimensec: 0,
|
||||
mtimensec: 0,
|
||||
ctimensec: 0,
|
||||
nlink: 0,
|
||||
uid: 0,
|
||||
gid: 0,
|
||||
rdev: 0,
|
||||
flags: 0,
|
||||
#[cfg(target_os = "macos")]
|
||||
crtime: 0,
|
||||
#[cfg(target_os = "macos")]
|
||||
crtimensec: 0,
|
||||
#[cfg(target_os = "macos")]
|
||||
padding: 0,
|
||||
};
|
||||
|
||||
/// for given &Node and inode, construct an [Attr]
|
||||
pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr {
|
||||
Attr {
|
||||
ino: inode,
|
||||
// FUTUREWORK: play with this numbers, as it affects read sizes for client applications.
|
||||
blocks: 1024,
|
||||
size: match inode_data {
|
||||
InodeData::Regular(_, size, _) => *size as u64,
|
||||
InodeData::Symlink(target) => target.len() as u64,
|
||||
InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size as u64,
|
||||
InodeData::Directory(DirectoryInodeData::Populated(_, ref children)) => {
|
||||
children.len() as u64
|
||||
}
|
||||
},
|
||||
mode: match inode_data {
|
||||
InodeData::Regular(_, _, false) => libc::S_IFREG as u32 | 0o444, // no-executable files
|
||||
InodeData::Regular(_, _, true) => libc::S_IFREG as u32 | 0o555, // executable files
|
||||
InodeData::Symlink(_) => libc::S_IFLNK as u32 | 0o444,
|
||||
InodeData::Directory(_) => libc::S_IFDIR as u32 | 0o555,
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
}
|
113
tvix/castore/src/fs/fuse.rs
Normal file
113
tvix/castore/src/fs/fuse.rs
Normal file
|
@ -0,0 +1,113 @@
|
|||
use std::{io, path::Path, sync::Arc, thread};
|
||||
|
||||
use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
|
||||
use tracing::error;
|
||||
|
||||
struct FuseServer<FS>
|
||||
where
|
||||
FS: FileSystem + Sync + Send,
|
||||
{
|
||||
server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
|
||||
channel: fuse_backend_rs::transport::FuseChannel,
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
const BADFD: libc::c_int = libc::EBADF;
|
||||
#[cfg(target_os = "linux")]
|
||||
const BADFD: libc::c_int = libc::EBADFD;
|
||||
|
||||
impl<FS> FuseServer<FS>
|
||||
where
|
||||
FS: FileSystem + Sync + Send,
|
||||
{
|
||||
fn start(&mut self) -> io::Result<()> {
|
||||
while let Some((reader, writer)) = self
|
||||
.channel
|
||||
.get_request()
|
||||
.map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
|
||||
{
|
||||
if let Err(e) = self
|
||||
.server
|
||||
.handle_message(reader, writer.into(), None, None)
|
||||
{
|
||||
match e {
|
||||
// This indicates the session has been shut down.
|
||||
fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
|
||||
break;
|
||||
}
|
||||
error => {
|
||||
error!(?error, "failed to handle fuse request");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FuseDaemon {
|
||||
session: FuseSession,
|
||||
threads: Vec<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl FuseDaemon {
|
||||
pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
|
||||
where
|
||||
FS: FileSystem + Sync + Send + 'static,
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
|
||||
|
||||
let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
session.set_allow_other(false);
|
||||
session
|
||||
.mount()
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
|
||||
let mut join_handles = Vec::with_capacity(threads);
|
||||
for _ in 0..threads {
|
||||
let mut server = FuseServer {
|
||||
server: server.clone(),
|
||||
channel: session
|
||||
.new_channel()
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
|
||||
};
|
||||
let join_handle = thread::Builder::new()
|
||||
.name("fuse_server".to_string())
|
||||
.spawn(move || {
|
||||
let _ = server.start();
|
||||
})?;
|
||||
join_handles.push(join_handle);
|
||||
}
|
||||
|
||||
Ok(FuseDaemon {
|
||||
session,
|
||||
threads: join_handles,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn unmount(&mut self) -> Result<(), io::Error> {
|
||||
self.session
|
||||
.umount()
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
|
||||
|
||||
for thread in self.threads.drain(..) {
|
||||
thread.join().map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FuseDaemon {
|
||||
fn drop(&mut self) {
|
||||
if let Err(error) = self.unmount() {
|
||||
error!(?error, "failed to unmont fuse filesystem")
|
||||
}
|
||||
}
|
||||
}
|
207
tvix/castore/src/fs/inode_tracker.rs
Normal file
207
tvix/castore/src/fs/inode_tracker.rs
Normal file
|
@ -0,0 +1,207 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use super::inodes::{DirectoryInodeData, InodeData};
|
||||
use crate::B3Digest;
|
||||
|
||||
/// InodeTracker keeps track of inodes, stores data being these inodes and deals
|
||||
/// with inode allocation.
|
||||
pub struct InodeTracker {
|
||||
data: HashMap<u64, Arc<InodeData>>,
|
||||
|
||||
// lookup table for blobs by their B3Digest
|
||||
blob_digest_to_inode: HashMap<B3Digest, u64>,
|
||||
|
||||
// lookup table for symlinks by their target
|
||||
symlink_target_to_inode: HashMap<bytes::Bytes, u64>,
|
||||
|
||||
// lookup table for directories by their B3Digest.
|
||||
// Note the corresponding directory may not be present in data yet.
|
||||
directory_digest_to_inode: HashMap<B3Digest, u64>,
|
||||
|
||||
// the next inode to allocate
|
||||
next_inode: u64,
|
||||
}
|
||||
|
||||
impl Default for InodeTracker {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
data: Default::default(),
|
||||
|
||||
blob_digest_to_inode: Default::default(),
|
||||
symlink_target_to_inode: Default::default(),
|
||||
directory_digest_to_inode: Default::default(),
|
||||
|
||||
next_inode: 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InodeTracker {
|
||||
// Retrieves data for a given inode, if it exists.
|
||||
pub fn get(&self, ino: u64) -> Option<Arc<InodeData>> {
|
||||
self.data.get(&ino).cloned()
|
||||
}
|
||||
|
||||
// Replaces data for a given inode.
|
||||
// Panics if the inode doesn't already exist.
|
||||
pub fn replace(&mut self, ino: u64, data: Arc<InodeData>) {
|
||||
if self.data.insert(ino, data).is_none() {
|
||||
panic!("replace called on unknown inode");
|
||||
}
|
||||
}
|
||||
|
||||
// Stores data and returns the inode for it.
|
||||
// In case an inode has already been allocated for the same data, that inode
|
||||
// is returned, otherwise a new one is allocated.
|
||||
// In case data is a [InodeData::Directory], inodes for all items are looked
|
||||
// up
|
||||
pub fn put(&mut self, data: InodeData) -> u64 {
|
||||
match data {
|
||||
InodeData::Regular(ref digest, _, _) => {
|
||||
match self.blob_digest_to_inode.get(digest) {
|
||||
Some(found_ino) => {
|
||||
// We already have it, return the inode.
|
||||
*found_ino
|
||||
}
|
||||
None => self.insert_and_increment(data),
|
||||
}
|
||||
}
|
||||
InodeData::Symlink(ref target) => {
|
||||
match self.symlink_target_to_inode.get(target) {
|
||||
Some(found_ino) => {
|
||||
// We already have it, return the inode.
|
||||
*found_ino
|
||||
}
|
||||
None => self.insert_and_increment(data),
|
||||
}
|
||||
}
|
||||
InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => {
|
||||
// check the lookup table if the B3Digest is known.
|
||||
match self.directory_digest_to_inode.get(digest) {
|
||||
Some(found_ino) => {
|
||||
// We already have it, return the inode.
|
||||
*found_ino
|
||||
}
|
||||
None => {
|
||||
// insert and return the inode
|
||||
self.insert_and_increment(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Inserting [DirectoryInodeData::Populated] doesn't normally happen,
|
||||
// only via [replace].
|
||||
InodeData::Directory(DirectoryInodeData::Populated(..)) => {
|
||||
unreachable!("should never be called with DirectoryInodeData::Populated")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Inserts the data and returns the inode it was stored at, while
|
||||
// incrementing next_inode.
|
||||
fn insert_and_increment(&mut self, data: InodeData) -> u64 {
|
||||
let ino = self.next_inode;
|
||||
// insert into lookup tables
|
||||
match data {
|
||||
InodeData::Regular(ref digest, _, _) => {
|
||||
self.blob_digest_to_inode.insert(digest.clone(), ino);
|
||||
}
|
||||
InodeData::Symlink(ref target) => {
|
||||
self.symlink_target_to_inode.insert(target.clone(), ino);
|
||||
}
|
||||
InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => {
|
||||
self.directory_digest_to_inode.insert(digest.clone(), ino);
|
||||
}
|
||||
// This is currently not used outside test fixtures.
|
||||
// Usually a [DirectoryInodeData::Sparse] is inserted and later
|
||||
// "upgraded" with more data.
|
||||
// However, as a future optimization, a lookup for a PathInfo could trigger a
|
||||
// [DirectoryService::get_recursive()] request that "forks into
|
||||
// background" and prepopulates all Directories in a closure.
|
||||
InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) => {
|
||||
self.directory_digest_to_inode.insert(digest.clone(), ino);
|
||||
}
|
||||
}
|
||||
// Insert data
|
||||
self.data.insert(ino, Arc::new(data));
|
||||
|
||||
// increment inode counter and return old inode.
|
||||
self.next_inode += 1;
|
||||
ino
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::fixtures;
|
||||
|
||||
use super::InodeData;
|
||||
use super::InodeTracker;
|
||||
|
||||
/// Getting something non-existent should be none
|
||||
#[test]
|
||||
fn get_nonexistent() {
|
||||
let inode_tracker = InodeTracker::default();
|
||||
assert!(inode_tracker.get(1).is_none());
|
||||
}
|
||||
|
||||
/// Put of a regular file should allocate a uid, which should be the same when inserting again.
|
||||
#[test]
|
||||
fn put_regular() {
|
||||
let mut inode_tracker = InodeTracker::default();
|
||||
let f = InodeData::Regular(
|
||||
fixtures::BLOB_A_DIGEST.clone(),
|
||||
fixtures::BLOB_A.len() as u64,
|
||||
false,
|
||||
);
|
||||
|
||||
// put it in
|
||||
let ino = inode_tracker.put(f.clone());
|
||||
|
||||
// a get should return the right data
|
||||
let data = inode_tracker.get(ino).expect("must be some");
|
||||
match *data {
|
||||
InodeData::Regular(ref digest, _, _) => {
|
||||
assert_eq!(&fixtures::BLOB_A_DIGEST.clone(), digest);
|
||||
}
|
||||
InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"),
|
||||
}
|
||||
|
||||
// another put should return the same ino
|
||||
assert_eq!(ino, inode_tracker.put(f));
|
||||
|
||||
// inserting another file should return a different ino
|
||||
assert_ne!(
|
||||
ino,
|
||||
inode_tracker.put(InodeData::Regular(
|
||||
fixtures::BLOB_B_DIGEST.clone(),
|
||||
fixtures::BLOB_B.len() as u64,
|
||||
false,
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
// Put of a symlink should allocate a uid, which should be the same when inserting again
|
||||
#[test]
|
||||
fn put_symlink() {
|
||||
let mut inode_tracker = InodeTracker::default();
|
||||
let f = InodeData::Symlink("target".into());
|
||||
|
||||
// put it in
|
||||
let ino = inode_tracker.put(f.clone());
|
||||
|
||||
// a get should return the right data
|
||||
let data = inode_tracker.get(ino).expect("must be some");
|
||||
match *data {
|
||||
InodeData::Symlink(ref target) => {
|
||||
assert_eq!(b"target".to_vec(), *target);
|
||||
}
|
||||
InodeData::Regular(..) | InodeData::Directory(..) => panic!("wrong type"),
|
||||
}
|
||||
|
||||
// another put should return the same ino
|
||||
assert_eq!(ino, inode_tracker.put(f));
|
||||
|
||||
// inserting another file should return a different ino
|
||||
assert_ne!(ino, inode_tracker.put(InodeData::Symlink("target2".into())));
|
||||
}
|
||||
}
|
57
tvix/castore/src/fs/inodes.rs
Normal file
57
tvix/castore/src/fs/inodes.rs
Normal file
|
@ -0,0 +1,57 @@
|
|||
//! This module contains all the data structures used to track information
|
||||
//! about inodes, which present tvix-castore nodes in a filesystem.
|
||||
use crate::proto as castorepb;
|
||||
use crate::B3Digest;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum InodeData {
|
||||
Regular(B3Digest, u64, bool), // digest, size, executable
|
||||
Symlink(bytes::Bytes), // target
|
||||
Directory(DirectoryInodeData), // either [DirectoryInodeData:Sparse] or [DirectoryInodeData:Populated]
|
||||
}
|
||||
|
||||
/// This encodes the two different states of [InodeData::Directory].
|
||||
/// Either the data still is sparse (we only saw a [castorepb::DirectoryNode],
|
||||
/// but didn't fetch the [castorepb::Directory] struct yet, or we processed a
|
||||
/// lookup and did fetch the data.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum DirectoryInodeData {
|
||||
Sparse(B3Digest, u64), // digest, size
|
||||
Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)]
|
||||
}
|
||||
|
||||
impl From<&castorepb::node::Node> for InodeData {
|
||||
fn from(value: &castorepb::node::Node) -> Self {
|
||||
match value {
|
||||
castorepb::node::Node::Directory(directory_node) => directory_node.into(),
|
||||
castorepb::node::Node::File(file_node) => file_node.into(),
|
||||
castorepb::node::Node::Symlink(symlink_node) => symlink_node.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&castorepb::SymlinkNode> for InodeData {
|
||||
fn from(value: &castorepb::SymlinkNode) -> Self {
|
||||
InodeData::Symlink(value.target.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&castorepb::FileNode> for InodeData {
|
||||
fn from(value: &castorepb::FileNode) -> Self {
|
||||
InodeData::Regular(
|
||||
value.digest.clone().try_into().unwrap(),
|
||||
value.size,
|
||||
value.executable,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts a DirectoryNode to a sparsely populated InodeData::Directory.
|
||||
impl From<&castorepb::DirectoryNode> for InodeData {
|
||||
fn from(value: &castorepb::DirectoryNode) -> Self {
|
||||
InodeData::Directory(DirectoryInodeData::Sparse(
|
||||
value.digest.clone().try_into().unwrap(),
|
||||
value.size,
|
||||
))
|
||||
}
|
||||
}
|
627
tvix/castore/src/fs/mod.rs
Normal file
627
tvix/castore/src/fs/mod.rs
Normal file
|
@ -0,0 +1,627 @@
|
|||
mod file_attr;
|
||||
mod inode_tracker;
|
||||
mod inodes;
|
||||
mod root_nodes;
|
||||
|
||||
#[cfg(feature = "fuse")]
|
||||
pub mod fuse;
|
||||
|
||||
#[cfg(feature = "virtiofs")]
|
||||
pub mod virtiofs;
|
||||
|
||||
use crate::proto as castorepb;
|
||||
use crate::{
|
||||
blobservice::{BlobReader, BlobService},
|
||||
directoryservice::DirectoryService,
|
||||
proto::{node::Node, NamedNode},
|
||||
B3Digest,
|
||||
};
|
||||
use fuse_backend_rs::abi::fuse_abi::stat64;
|
||||
use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
|
||||
use futures::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
use std::ops::Deref;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io,
|
||||
sync::atomic::AtomicU64,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncSeekExt},
|
||||
sync::mpsc,
|
||||
};
|
||||
use tracing::{debug, info_span, instrument, warn};
|
||||
|
||||
pub use self::root_nodes::RootNodes;
|
||||
use self::{
|
||||
file_attr::{gen_file_attr, ROOT_FILE_ATTR},
|
||||
inode_tracker::InodeTracker,
|
||||
inodes::{DirectoryInodeData, InodeData},
|
||||
};
|
||||
|
||||
/// This implements a read-only FUSE filesystem for a tvix-store
|
||||
/// with the passed [BlobService], [DirectoryService] and [RootNodes].
|
||||
///
|
||||
/// Linux uses inodes in filesystems. When implementing FUSE, most calls are
|
||||
/// *for* a given inode.
|
||||
///
|
||||
/// This means, we need to have a stable mapping of inode numbers to the
|
||||
/// corresponding store nodes.
|
||||
///
|
||||
/// We internally delegate all inode allocation and state keeping to the
|
||||
/// inode tracker.
|
||||
/// We store a mapping from currently "explored" names in the root to their
|
||||
/// inode.
|
||||
///
|
||||
/// There's some places where inodes are allocated / data inserted into
|
||||
/// the inode tracker, if not allocated before already:
|
||||
/// - Processing a `lookup` request, either in the mount root, or somewhere
|
||||
/// deeper.
|
||||
/// - Processing a `readdir` request
|
||||
///
|
||||
/// Things pointing to the same contents get the same inodes, irrespective of
|
||||
/// their own location.
|
||||
/// This means:
|
||||
/// - Symlinks with the same target will get the same inode.
|
||||
/// - Regular/executable files with the same contents will get the same inode
|
||||
/// - Directories with the same contents will get the same inode.
|
||||
///
|
||||
/// Due to the above being valid across the whole store, and considering the
|
||||
/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
|
||||
/// allocation", aka reserve Directory.size inodes for each directory node we
|
||||
/// explore.
|
||||
/// Tests for this live in the tvix-store crate.
|
||||
pub struct TvixStoreFs<BS, DS, RN> {
|
||||
blob_service: BS,
|
||||
directory_service: DS,
|
||||
root_nodes_provider: RN,
|
||||
|
||||
/// Whether to (try) listing elements in the root.
|
||||
list_root: bool,
|
||||
|
||||
/// This maps a given basename in the root to the inode we allocated for the node.
|
||||
root_nodes: RwLock<HashMap<Vec<u8>, u64>>,
|
||||
|
||||
/// This keeps track of inodes and data alongside them.
|
||||
inode_tracker: RwLock<InodeTracker>,
|
||||
|
||||
/// This holds all open file handles
|
||||
#[allow(clippy::type_complexity)]
|
||||
file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>,
|
||||
|
||||
next_file_handle: AtomicU64,
|
||||
|
||||
tokio_handle: tokio::runtime::Handle,
|
||||
}
|
||||
|
||||
impl<BS, DS, RN> TvixStoreFs<BS, DS, RN>
|
||||
where
|
||||
BS: Deref<Target = dyn BlobService> + Clone + Send,
|
||||
DS: Deref<Target = dyn DirectoryService> + Clone + Send + 'static,
|
||||
RN: RootNodes + Clone + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
blob_service: BS,
|
||||
directory_service: DS,
|
||||
root_nodes_provider: RN,
|
||||
list_root: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
blob_service,
|
||||
directory_service,
|
||||
root_nodes_provider,
|
||||
|
||||
list_root,
|
||||
|
||||
root_nodes: RwLock::new(HashMap::default()),
|
||||
inode_tracker: RwLock::new(Default::default()),
|
||||
|
||||
file_handles: RwLock::new(Default::default()),
|
||||
next_file_handle: AtomicU64::new(1),
|
||||
tokio_handle: tokio::runtime::Handle::current(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieves the inode for a given root node basename, if present.
|
||||
/// This obtains a read lock on self.root_nodes.
|
||||
fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> {
|
||||
self.root_nodes.read().get(name).cloned()
|
||||
}
|
||||
|
||||
/// For a given inode, look up the given directory behind it (from
|
||||
/// self.inode_tracker), and return its children.
|
||||
/// The inode_tracker MUST know about this inode already, and it MUST point
|
||||
/// to a [InodeData::Directory].
|
||||
/// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
|
||||
/// in self.directory_service is performed, and self.inode_tracker is updated with the
|
||||
/// [DirectoryInodeData::Populated].
|
||||
#[instrument(skip(self), err)]
|
||||
fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> {
|
||||
let data = self.inode_tracker.read().get(ino).unwrap();
|
||||
match *data {
|
||||
// if it's populated already, return children.
|
||||
InodeData::Directory(DirectoryInodeData::Populated(
|
||||
ref parent_digest,
|
||||
ref children,
|
||||
)) => Ok((parent_digest.clone(), children.clone())),
|
||||
// if it's sparse, fetch data using directory_service, populate child nodes
|
||||
// and update it in [self.inode_tracker].
|
||||
InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
|
||||
let directory = self
|
||||
.tokio_handle
|
||||
.block_on(self.tokio_handle.spawn({
|
||||
let directory_service = self.directory_service.clone();
|
||||
let parent_digest = parent_digest.to_owned();
|
||||
async move { directory_service.get(&parent_digest).await }
|
||||
}))
|
||||
.unwrap()?
|
||||
.ok_or_else(|| {
|
||||
warn!(directory.digest=%parent_digest, "directory not found");
|
||||
// If the Directory can't be found, this is a hole, bail out.
|
||||
io::Error::from_raw_os_error(libc::EIO)
|
||||
})?;
|
||||
|
||||
// Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)),
|
||||
// allocating inodes for the children on the way.
|
||||
let children = {
|
||||
let mut inode_tracker = self.inode_tracker.write();
|
||||
|
||||
let children: Vec<(u64, castorepb::node::Node)> = directory
|
||||
.nodes()
|
||||
.map(|child_node| {
|
||||
let child_ino = inode_tracker.put((&child_node).into());
|
||||
(child_ino, child_node)
|
||||
})
|
||||
.collect();
|
||||
|
||||
// replace.
|
||||
inode_tracker.replace(
|
||||
ino,
|
||||
Arc::new(InodeData::Directory(DirectoryInodeData::Populated(
|
||||
parent_digest.clone(),
|
||||
children.clone(),
|
||||
))),
|
||||
);
|
||||
|
||||
children
|
||||
};
|
||||
|
||||
Ok((parent_digest.clone(), children))
|
||||
}
|
||||
// if the parent inode was not a directory, this doesn't make sense
|
||||
InodeData::Regular(..) | InodeData::Symlink(_) => {
|
||||
Err(io::Error::from_raw_os_error(libc::ENOTDIR))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This will turn a lookup request for a name in the root to a ino and
|
||||
/// [InodeData].
|
||||
/// It will peek in [self.root_nodes], and then either look it up from
|
||||
/// [self.inode_tracker],
|
||||
/// or otherwise fetch from [self.root_nodes], and then insert into
|
||||
/// [self.inode_tracker].
|
||||
/// In the case the name can't be found, a libc::ENOENT is returned.
|
||||
fn name_in_root_to_ino_and_data(
|
||||
&self,
|
||||
name: &std::ffi::CStr,
|
||||
) -> io::Result<(u64, Arc<InodeData>)> {
|
||||
// Look up the inode for that root node.
|
||||
// If there's one, [self.inode_tracker] MUST also contain the data,
|
||||
// which we can then return.
|
||||
if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) {
|
||||
return Ok((
|
||||
inode,
|
||||
self.inode_tracker
|
||||
.read()
|
||||
.get(inode)
|
||||
.expect("must exist")
|
||||
.to_owned(),
|
||||
));
|
||||
}
|
||||
|
||||
// We don't have it yet, look it up in [self.root_nodes].
|
||||
match self.tokio_handle.block_on({
|
||||
let root_nodes_provider = self.root_nodes_provider.clone();
|
||||
async move { root_nodes_provider.get_by_basename(name.to_bytes()).await }
|
||||
}) {
|
||||
// if there was an error looking up the root node, propagate up an IO error.
|
||||
Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
|
||||
// the root node doesn't exist, so the file doesn't exist.
|
||||
Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
|
||||
// The root node does exist
|
||||
Ok(Some(root_node)) => {
|
||||
// The name must match what's passed in the lookup, otherwise this is also a ENOENT.
|
||||
if root_node.get_name() != name.to_bytes() {
|
||||
debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch");
|
||||
return Err(io::Error::from_raw_os_error(libc::ENOENT));
|
||||
}
|
||||
|
||||
// Let's check if someone else beat us to updating the inode tracker and
|
||||
// root_nodes map. This avoids locking inode_tracker for writing.
|
||||
if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) {
|
||||
return Ok((
|
||||
*ino,
|
||||
self.inode_tracker.read().get(*ino).expect("must exist"),
|
||||
));
|
||||
}
|
||||
|
||||
// Only in case it doesn't, lock [self.root_nodes] and
|
||||
// [self.inode_tracker] for writing.
|
||||
let mut root_nodes = self.root_nodes.write();
|
||||
let mut inode_tracker = self.inode_tracker.write();
|
||||
|
||||
// insert the (sparse) inode data and register in
|
||||
// self.root_nodes.
|
||||
let inode_data: InodeData = (&root_node).into();
|
||||
let ino = inode_tracker.put(inode_data.clone());
|
||||
root_nodes.insert(name.to_bytes().into(), ino);
|
||||
|
||||
Ok((ino, Arc::new(inode_data)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN>
|
||||
where
|
||||
BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
|
||||
DS: Deref<Target = dyn DirectoryService> + Send + Clone + 'static,
|
||||
RN: RootNodes + Clone + 'static,
|
||||
{
|
||||
type Handle = u64;
|
||||
type Inode = u64;
|
||||
|
||||
fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
|
||||
Ok(FsOptions::empty())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.inode = inode))]
|
||||
fn getattr(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
inode: Self::Inode,
|
||||
_handle: Option<Self::Handle>,
|
||||
) -> io::Result<(stat64, Duration)> {
|
||||
if inode == ROOT_ID {
|
||||
return Ok((ROOT_FILE_ATTR.into(), Duration::MAX));
|
||||
}
|
||||
|
||||
match self.inode_tracker.read().get(inode) {
|
||||
None => Err(io::Error::from_raw_os_error(libc::ENOENT)),
|
||||
Some(node) => {
|
||||
debug!(node = ?node, "found node");
|
||||
Ok((gen_file_attr(&node, inode).into(), Duration::MAX))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
|
||||
fn lookup(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
parent: Self::Inode,
|
||||
name: &std::ffi::CStr,
|
||||
) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
|
||||
debug!("lookup");
|
||||
|
||||
// This goes from a parent inode to a node.
|
||||
// - If the parent is [ROOT_ID], we need to check
|
||||
// [self.root_nodes] (fetching from a [RootNode] provider if needed)
|
||||
// - Otherwise, lookup the parent in [self.inode_tracker] (which must be
|
||||
// a [InodeData::Directory]), and find the child with that name.
|
||||
if parent == ROOT_ID {
|
||||
let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?;
|
||||
|
||||
debug!(inode_data=?&inode_data, ino=ino, "Some");
|
||||
return Ok(fuse_backend_rs::api::filesystem::Entry {
|
||||
inode: ino,
|
||||
attr: gen_file_attr(&inode_data, ino).into(),
|
||||
attr_timeout: Duration::MAX,
|
||||
entry_timeout: Duration::MAX,
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
// This is the "lookup for "a" inside inode 42.
|
||||
// We already know that inode 42 must be a directory.
|
||||
let (parent_digest, children) = self.get_directory_children(parent)?;
|
||||
|
||||
let span = info_span!("lookup", directory.digest = %parent_digest);
|
||||
let _enter = span.enter();
|
||||
|
||||
// Search for that name in the list of children and return the FileAttrs.
|
||||
|
||||
// in the children, find the one with the desired name.
|
||||
if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
|
||||
// lookup the child [InodeData] in [self.inode_tracker].
|
||||
// We know the inodes for children have already been allocated.
|
||||
let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap();
|
||||
|
||||
// Reply with the file attributes for the child.
|
||||
// For child directories, we still have all data we need to reply.
|
||||
Ok(fuse_backend_rs::api::filesystem::Entry {
|
||||
inode: *child_ino,
|
||||
attr: gen_file_attr(&child_inode_data, *child_ino).into(),
|
||||
attr_timeout: Duration::MAX,
|
||||
entry_timeout: Duration::MAX,
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
// Child not found, return ENOENT.
|
||||
Err(io::Error::from_raw_os_error(libc::ENOENT))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: readdirplus?
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))]
|
||||
fn readdir(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
inode: Self::Inode,
|
||||
_handle: Self::Handle,
|
||||
_size: u32,
|
||||
offset: u64,
|
||||
add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
|
||||
) -> io::Result<()> {
|
||||
debug!("readdir");
|
||||
|
||||
if inode == ROOT_ID {
|
||||
if !self.list_root {
|
||||
return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
|
||||
} else {
|
||||
let root_nodes_provider = self.root_nodes_provider.clone();
|
||||
let (tx, mut rx) = mpsc::channel(16);
|
||||
|
||||
// This task will run in the background immediately and will exit
|
||||
// after the stream ends or if we no longer want any more entries.
|
||||
self.tokio_handle.spawn(async move {
|
||||
let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate();
|
||||
while let Some(node) = stream.next().await {
|
||||
if tx.send(node).await.is_err() {
|
||||
// If we get a send error, it means the sync code
|
||||
// doesn't want any more entries.
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while let Some((i, root_node)) = rx.blocking_recv() {
|
||||
let root_node = match root_node {
|
||||
Err(e) => {
|
||||
warn!("failed to retrieve pathinfo: {}", e);
|
||||
return Err(io::Error::from_raw_os_error(libc::EPERM));
|
||||
}
|
||||
Ok(root_node) => root_node,
|
||||
};
|
||||
|
||||
let name = root_node.get_name();
|
||||
// obtain the inode, or allocate a new one.
|
||||
let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| {
|
||||
// insert the (sparse) inode data and register in
|
||||
// self.root_nodes.
|
||||
let ino = self.inode_tracker.write().put((&root_node).into());
|
||||
self.root_nodes.write().insert(name.into(), ino);
|
||||
ino
|
||||
});
|
||||
|
||||
let ty = match root_node {
|
||||
Node::Directory(_) => libc::S_IFDIR,
|
||||
Node::File(_) => libc::S_IFREG,
|
||||
Node::Symlink(_) => libc::S_IFLNK,
|
||||
};
|
||||
|
||||
let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
|
||||
ino,
|
||||
offset: offset + i as u64 + 1,
|
||||
type_: ty,
|
||||
name,
|
||||
})?;
|
||||
// If the buffer is full, add_entry will return `Ok(0)`.
|
||||
if written == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// lookup the children, or return an error if it's not a directory.
|
||||
let (parent_digest, children) = self.get_directory_children(inode)?;
|
||||
|
||||
let span = info_span!("lookup", directory.digest = %parent_digest);
|
||||
let _enter = span.enter();
|
||||
|
||||
for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() {
|
||||
// the second parameter will become the "offset" parameter on the next call.
|
||||
let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
|
||||
ino: *ino,
|
||||
offset: offset + i as u64 + 1,
|
||||
type_: match child_node {
|
||||
#[allow(clippy::unnecessary_cast)]
|
||||
// libc::S_IFDIR is u32 on Linux and u16 on MacOS
|
||||
Node::Directory(_) => libc::S_IFDIR as u32,
|
||||
#[allow(clippy::unnecessary_cast)]
|
||||
// libc::S_IFDIR is u32 on Linux and u16 on MacOS
|
||||
Node::File(_) => libc::S_IFREG as u32,
|
||||
#[allow(clippy::unnecessary_cast)]
|
||||
// libc::S_IFDIR is u32 on Linux and u16 on MacOS
|
||||
Node::Symlink(_) => libc::S_IFLNK as u32,
|
||||
},
|
||||
name: child_node.get_name(),
|
||||
})?;
|
||||
// If the buffer is full, add_entry will return `Ok(0)`.
|
||||
if written == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.inode = inode))]
|
||||
fn open(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
inode: Self::Inode,
|
||||
_flags: u32,
|
||||
_fuse_flags: u32,
|
||||
) -> io::Result<(
|
||||
Option<Self::Handle>,
|
||||
fuse_backend_rs::api::filesystem::OpenOptions,
|
||||
)> {
|
||||
if inode == ROOT_ID {
|
||||
return Err(io::Error::from_raw_os_error(libc::ENOSYS));
|
||||
}
|
||||
|
||||
// lookup the inode
|
||||
match *self.inode_tracker.read().get(inode).unwrap() {
|
||||
// read is invalid on non-files.
|
||||
InodeData::Directory(..) | InodeData::Symlink(_) => {
|
||||
warn!("is directory");
|
||||
Err(io::Error::from_raw_os_error(libc::EISDIR))
|
||||
}
|
||||
InodeData::Regular(ref blob_digest, _blob_size, _) => {
|
||||
let span = info_span!("read", blob.digest = %blob_digest);
|
||||
let _enter = span.enter();
|
||||
|
||||
let blob_service = self.blob_service.clone();
|
||||
let blob_digest = blob_digest.clone();
|
||||
|
||||
let task = self
|
||||
.tokio_handle
|
||||
.spawn(async move { blob_service.open_read(&blob_digest).await });
|
||||
|
||||
let blob_reader = self.tokio_handle.block_on(task).unwrap();
|
||||
|
||||
match blob_reader {
|
||||
Ok(None) => {
|
||||
warn!("blob not found");
|
||||
Err(io::Error::from_raw_os_error(libc::EIO))
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(e=?e, "error opening blob");
|
||||
Err(io::Error::from_raw_os_error(libc::EIO))
|
||||
}
|
||||
Ok(Some(blob_reader)) => {
|
||||
// get a new file handle
|
||||
// TODO: this will overflow after 2**64 operations,
|
||||
// which is fine for now.
|
||||
// See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
|
||||
// for the discussion on alternatives.
|
||||
let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
debug!("add file handle {}", fh);
|
||||
self.file_handles
|
||||
.write()
|
||||
.insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader)));
|
||||
|
||||
Ok((
|
||||
Some(fh),
|
||||
fuse_backend_rs::api::filesystem::OpenOptions::empty(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))]
|
||||
fn release(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
inode: Self::Inode,
|
||||
_flags: u32,
|
||||
handle: Self::Handle,
|
||||
_flush: bool,
|
||||
_flock_release: bool,
|
||||
_lock_owner: Option<u64>,
|
||||
) -> io::Result<()> {
|
||||
// remove and get ownership on the blob reader
|
||||
match self.file_handles.write().remove(&handle) {
|
||||
// drop it, which will close it.
|
||||
Some(blob_reader) => drop(blob_reader),
|
||||
None => {
|
||||
// These might already be dropped if a read error occured.
|
||||
debug!("file_handle {} not found", handle);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))]
|
||||
fn read(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
inode: Self::Inode,
|
||||
handle: Self::Handle,
|
||||
w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
|
||||
size: u32,
|
||||
offset: u64,
|
||||
_lock_owner: Option<u64>,
|
||||
_flags: u32,
|
||||
) -> io::Result<usize> {
|
||||
debug!("read");
|
||||
|
||||
// We need to take out the blob reader from self.file_handles, so we can
|
||||
// interact with it in the separate task.
|
||||
// On success, we pass it back out of the task, so we can put it back in self.file_handles.
|
||||
let blob_reader = match self.file_handles.read().get(&handle) {
|
||||
Some(blob_reader) => blob_reader.clone(),
|
||||
None => {
|
||||
warn!("file handle {} unknown", handle);
|
||||
return Err(io::Error::from_raw_os_error(libc::EIO));
|
||||
}
|
||||
};
|
||||
|
||||
let task = self.tokio_handle.spawn(async move {
|
||||
let mut blob_reader = blob_reader.lock().await;
|
||||
|
||||
// seek to the offset specified, which is relative to the start of the file.
|
||||
let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await;
|
||||
|
||||
match resp {
|
||||
Ok(pos) => {
|
||||
debug_assert_eq!(offset, pos);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to seek to offset {}: {}", offset, e);
|
||||
return Err(io::Error::from_raw_os_error(libc::EIO));
|
||||
}
|
||||
}
|
||||
|
||||
// As written in the fuse docs, read should send exactly the number
|
||||
// of bytes requested except on EOF or error.
|
||||
|
||||
let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
|
||||
|
||||
// copy things from the internal buffer into buf to fill it till up until size
|
||||
tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?;
|
||||
|
||||
Ok(buf)
|
||||
});
|
||||
|
||||
let buf = self.tokio_handle.block_on(task).unwrap()?;
|
||||
|
||||
w.write(&buf)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.inode = inode))]
|
||||
fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
|
||||
if inode == ROOT_ID {
|
||||
return Err(io::Error::from_raw_os_error(libc::ENOSYS));
|
||||
}
|
||||
|
||||
// lookup the inode
|
||||
match *self.inode_tracker.read().get(inode).unwrap() {
|
||||
InodeData::Directory(..) | InodeData::Regular(..) => {
|
||||
Err(io::Error::from_raw_os_error(libc::EINVAL))
|
||||
}
|
||||
InodeData::Symlink(ref target) => Ok(target.to_vec()),
|
||||
}
|
||||
}
|
||||
}
|
18
tvix/castore/src/fs/root_nodes.rs
Normal file
18
tvix/castore/src/fs/root_nodes.rs
Normal file
|
@ -0,0 +1,18 @@
|
|||
use std::pin::Pin;
|
||||
|
||||
use crate::{proto::node::Node, Error};
|
||||
use futures::Stream;
|
||||
use tonic::async_trait;
|
||||
|
||||
/// Provides an interface for looking up root nodes in tvix-castore by given
|
||||
/// a lookup key (usually the basename), and optionally allow a listing.
|
||||
#[async_trait]
|
||||
pub trait RootNodes: Send + Sync {
|
||||
/// Looks up a root CA node based on the basename of the node in the root
|
||||
/// directory of the filesystem.
|
||||
async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>;
|
||||
|
||||
/// Lists all root CA nodes in the filesystem. An error can be returned
|
||||
/// in case listing is not allowed
|
||||
fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send>>;
|
||||
}
|
237
tvix/castore/src/fs/virtiofs.rs
Normal file
237
tvix/castore/src/fs/virtiofs.rs
Normal file
|
@ -0,0 +1,237 @@
|
|||
use std::{
|
||||
convert, error, fmt, io,
|
||||
ops::Deref,
|
||||
path::Path,
|
||||
sync::{Arc, MutexGuard, RwLock},
|
||||
};
|
||||
|
||||
use fuse_backend_rs::{
|
||||
api::{filesystem::FileSystem, server::Server},
|
||||
transport::{FsCacheReqHandler, Reader, VirtioFsWriter},
|
||||
};
|
||||
use tracing::error;
|
||||
use vhost::vhost_user::{
|
||||
Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures,
|
||||
};
|
||||
use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT};
|
||||
use virtio_bindings::bindings::virtio_ring::{
|
||||
VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC,
|
||||
};
|
||||
use virtio_queue::QueueT;
|
||||
use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
|
||||
use vmm_sys_util::epoll::EventSet;
|
||||
|
||||
const VIRTIO_F_VERSION_1: u32 = 32;
|
||||
const NUM_QUEUES: usize = 2;
|
||||
const QUEUE_SIZE: usize = 1024;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Error {
|
||||
/// Failed to handle non-input event.
|
||||
HandleEventNotEpollIn,
|
||||
/// Failed to handle unknown event.
|
||||
HandleEventUnknownEvent,
|
||||
/// Invalid descriptor chain.
|
||||
InvalidDescriptorChain,
|
||||
/// Failed to handle filesystem requests.
|
||||
HandleRequests(fuse_backend_rs::Error),
|
||||
/// Failed to construct new vhost user daemon.
|
||||
NewDaemon,
|
||||
/// Failed to start the vhost user daemon.
|
||||
StartDaemon,
|
||||
/// Failed to wait for the vhost user daemon.
|
||||
WaitDaemon,
|
||||
}
|
||||
|
||||
impl fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "vhost_user_fs_error: {self:?}")
|
||||
}
|
||||
}
|
||||
|
||||
impl error::Error for Error {}
|
||||
|
||||
impl convert::From<Error> for io::Error {
|
||||
fn from(e: Error) -> Self {
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
}
|
||||
|
||||
struct VhostUserFsBackend<FS>
|
||||
where
|
||||
FS: FileSystem + Send + Sync,
|
||||
{
|
||||
server: Arc<Server<Arc<FS>>>,
|
||||
event_idx: bool,
|
||||
guest_mem: GuestMemoryAtomic<GuestMemoryMmap>,
|
||||
cache_req: Option<SlaveFsCacheReq>,
|
||||
}
|
||||
|
||||
impl<FS> VhostUserFsBackend<FS>
|
||||
where
|
||||
FS: FileSystem + Send + Sync,
|
||||
{
|
||||
fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> {
|
||||
let mut used_descs = false;
|
||||
|
||||
while let Some(desc_chain) = vring
|
||||
.get_queue_mut()
|
||||
.pop_descriptor_chain(self.guest_mem.memory())
|
||||
{
|
||||
let memory = desc_chain.memory();
|
||||
let reader = Reader::from_descriptor_chain(memory, desc_chain.clone())
|
||||
.map_err(|_| Error::InvalidDescriptorChain)?;
|
||||
let writer = VirtioFsWriter::new(memory, desc_chain.clone())
|
||||
.map_err(|_| Error::InvalidDescriptorChain)?;
|
||||
|
||||
self.server
|
||||
.handle_message(
|
||||
reader,
|
||||
writer.into(),
|
||||
self.cache_req
|
||||
.as_mut()
|
||||
.map(|req| req as &mut dyn FsCacheReqHandler),
|
||||
None,
|
||||
)
|
||||
.map_err(Error::HandleRequests)?;
|
||||
|
||||
// TODO: Is len 0 correct?
|
||||
if let Err(error) = vring
|
||||
.get_queue_mut()
|
||||
.add_used(memory, desc_chain.head_index(), 0)
|
||||
{
|
||||
error!(?error, "failed to add desc back to ring");
|
||||
}
|
||||
|
||||
// TODO: What happens if we error out before here?
|
||||
used_descs = true;
|
||||
}
|
||||
|
||||
let needs_notification = if self.event_idx {
|
||||
match vring
|
||||
.get_queue_mut()
|
||||
.needs_notification(self.guest_mem.memory().deref())
|
||||
{
|
||||
Ok(needs_notification) => needs_notification,
|
||||
Err(error) => {
|
||||
error!(?error, "failed to check if queue needs notification");
|
||||
true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
if needs_notification {
|
||||
if let Err(error) = vring.signal_used_queue() {
|
||||
error!(?error, "failed to signal used queue");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(used_descs)
|
||||
}
|
||||
}
|
||||
|
||||
impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS>
|
||||
where
|
||||
FS: FileSystem + Send + Sync,
|
||||
{
|
||||
fn num_queues(&self) -> usize {
|
||||
NUM_QUEUES
|
||||
}
|
||||
|
||||
fn max_queue_size(&self) -> usize {
|
||||
QUEUE_SIZE
|
||||
}
|
||||
|
||||
fn features(&self) -> u64 {
|
||||
1 << VIRTIO_F_VERSION_1
|
||||
| 1 << VIRTIO_RING_F_INDIRECT_DESC
|
||||
| 1 << VIRTIO_RING_F_EVENT_IDX
|
||||
| VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()
|
||||
}
|
||||
|
||||
fn protocol_features(&self) -> VhostUserProtocolFeatures {
|
||||
VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ
|
||||
}
|
||||
|
||||
fn set_event_idx(&mut self, enabled: bool) {
|
||||
self.event_idx = enabled;
|
||||
}
|
||||
|
||||
fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> {
|
||||
// This is what most the vhost user implementations do...
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) {
|
||||
self.cache_req = Some(cache_req);
|
||||
}
|
||||
|
||||
fn handle_event(
|
||||
&mut self,
|
||||
device_event: u16,
|
||||
evset: vmm_sys_util::epoll::EventSet,
|
||||
vrings: &[VringMutex],
|
||||
_thread_id: usize,
|
||||
) -> std::io::Result<bool> {
|
||||
if evset != EventSet::IN {
|
||||
return Err(Error::HandleEventNotEpollIn.into());
|
||||
}
|
||||
|
||||
let mut queue = match device_event {
|
||||
// High priority queue
|
||||
0 => vrings[0].get_mut(),
|
||||
// Regurlar priority queue
|
||||
1 => vrings[1].get_mut(),
|
||||
_ => {
|
||||
return Err(Error::HandleEventUnknownEvent.into());
|
||||
}
|
||||
};
|
||||
|
||||
if self.event_idx {
|
||||
loop {
|
||||
queue
|
||||
.get_queue_mut()
|
||||
.enable_notification(self.guest_mem.memory().deref())
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
|
||||
if !self.process_queue(&mut queue)? {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.process_queue(&mut queue)?;
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()>
|
||||
where
|
||||
FS: FileSystem + Send + Sync + 'static,
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
|
||||
|
||||
let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
|
||||
|
||||
let backend = Arc::new(RwLock::new(VhostUserFsBackend {
|
||||
server,
|
||||
guest_mem: guest_mem.clone(),
|
||||
event_idx: false,
|
||||
cache_req: None,
|
||||
}));
|
||||
|
||||
let listener = Listener::new(socket, true).unwrap();
|
||||
|
||||
let mut fs_daemon =
|
||||
VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem)
|
||||
.map_err(|_| Error::NewDaemon)?;
|
||||
|
||||
fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?;
|
||||
|
||||
fs_daemon.wait().map_err(|_| Error::WaitDaemon)?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -4,6 +4,10 @@ mod errors;
|
|||
pub mod blobservice;
|
||||
pub mod directoryservice;
|
||||
pub mod fixtures;
|
||||
|
||||
#[cfg(feature = "fs")]
|
||||
pub mod fs;
|
||||
|
||||
pub mod import;
|
||||
pub mod proto;
|
||||
pub mod tonic;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue