feat(tvix/castore/fs): support extended attributes
This exposes `user.tvix.castore.{blob,directory}.digest` xattr keys for files and directories: ``` ❯ getfattr -d /tmp/tvix/06jrrv6wwp0nc1m7fr5bgdw012rfzfx2-nano-7.2-info getfattr: Removing leading '/' from absolute path names user.tvix.castore.directory.digest="b3:SuYDcUM9RpWcnA40tYB1BtYpR0xw72v3ymhKDQbBfe4=" ❯ getfattr -d /tmp/tvix/156a89x10c3kaby9rgf3fi4k0p6r9wl1-etc-shells getfattr: Removing leading '/' from absolute path names user.tvix.castore.blob.digest="b3:pZkwZoHN+/VQ8wkaX0wYVXZ0tV/HhtKlSqiaWDK7uRs=" ``` It's currently mostly used for debugging, though it might be useful for tvix-castore-aware syncing programs using the filesystem too. Change-Id: I26ac3cb9fe51ffbf7f880519f26741549cb5ab6a Reviewed-on: https://cl.tvl.fyi/c/depot/+/11422 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI Reviewed-by: Brian Olsen <me@griff.name>
This commit is contained in:
parent
23871649bb
commit
515bfa18fb
7 changed files with 227 additions and 5 deletions
1
tvix/Cargo.lock
generated
1
tvix/Cargo.lock
generated
|
@ -4380,6 +4380,7 @@ dependencies = [
|
|||
"vm-memory",
|
||||
"vmm-sys-util",
|
||||
"walkdir",
|
||||
"xattr",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
|
|
|
@ -13953,6 +13953,10 @@ rec {
|
|||
name = "tokio-retry";
|
||||
packageId = "tokio-retry";
|
||||
}
|
||||
{
|
||||
name = "xattr";
|
||||
packageId = "xattr";
|
||||
}
|
||||
];
|
||||
features = {
|
||||
"cloud" = [ "dep:bigtable_rs" "object_store/aws" "object_store/azure" "object_store/gcp" ];
|
||||
|
|
|
@ -86,6 +86,7 @@ tempfile = "3.3.0"
|
|||
tokio-retry = "0.3.0"
|
||||
hex-literal = "0.4.1"
|
||||
rstest_reuse = "0.6.0"
|
||||
xattr = "1.3.1"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
|
|
@ -19,10 +19,14 @@ use crate::{
|
|||
proto::{node::Node, NamedNode},
|
||||
B3Digest,
|
||||
};
|
||||
use bstr::ByteVec;
|
||||
use fuse_backend_rs::abi::fuse_abi::stat64;
|
||||
use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
|
||||
use fuse_backend_rs::api::filesystem::{
|
||||
Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
use std::ffi::CStr;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
io,
|
||||
|
@ -83,6 +87,9 @@ pub struct TvixStoreFs<BS, DS, RN> {
|
|||
/// Whether to (try) listing elements in the root.
|
||||
list_root: bool,
|
||||
|
||||
/// Whether to expose blob and directory digests as extended attributes.
|
||||
show_xattr: bool,
|
||||
|
||||
/// This maps a given basename in the root to the inode we allocated for the node.
|
||||
root_nodes: RwLock<HashMap<Vec<u8>, u64>>,
|
||||
|
||||
|
@ -109,6 +116,7 @@ where
|
|||
directory_service: DS,
|
||||
root_nodes_provider: RN,
|
||||
list_root: bool,
|
||||
show_xattr: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
blob_service,
|
||||
|
@ -116,6 +124,7 @@ where
|
|||
root_nodes_provider,
|
||||
|
||||
list_root,
|
||||
show_xattr,
|
||||
|
||||
root_nodes: RwLock::new(HashMap::default()),
|
||||
inode_tracker: RwLock::new(Default::default()),
|
||||
|
@ -267,6 +276,9 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest";
|
||||
const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest";
|
||||
|
||||
impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN>
|
||||
where
|
||||
BS: AsRef<dyn BlobService> + Clone + Send + 'static,
|
||||
|
@ -628,4 +640,85 @@ where
|
|||
InodeData::Symlink(ref target) => Ok(target.to_vec()),
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.inode = inode, name=?name))]
|
||||
fn getxattr(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
inode: Self::Inode,
|
||||
name: &CStr,
|
||||
size: u32,
|
||||
) -> io::Result<GetxattrReply> {
|
||||
if !self.show_xattr {
|
||||
return Err(io::Error::from_raw_os_error(libc::ENOSYS));
|
||||
}
|
||||
|
||||
// Peek at the inode requested, and construct the response.
|
||||
let digest_str = match *self
|
||||
.inode_tracker
|
||||
.read()
|
||||
.get(inode)
|
||||
.ok_or_else(|| io::Error::from_raw_os_error(libc::ENODATA))?
|
||||
{
|
||||
InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _))
|
||||
| InodeData::Directory(DirectoryInodeData::Populated(ref digest, _))
|
||||
if name.to_bytes() == XATTR_NAME_DIRECTORY_DIGEST =>
|
||||
{
|
||||
digest.to_string()
|
||||
}
|
||||
InodeData::Regular(ref digest, _, _) if name.to_bytes() == XATTR_NAME_BLOB_DIGEST => {
|
||||
digest.to_string()
|
||||
}
|
||||
_ => {
|
||||
return Err(io::Error::from_raw_os_error(libc::ENODATA));
|
||||
}
|
||||
};
|
||||
|
||||
if size == 0 {
|
||||
Ok(GetxattrReply::Count(digest_str.len() as u32))
|
||||
} else if size < digest_str.len() as u32 {
|
||||
Err(io::Error::from_raw_os_error(libc::ERANGE))
|
||||
} else {
|
||||
Ok(GetxattrReply::Value(digest_str.into_bytes()))
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(rq.inode = inode))]
|
||||
fn listxattr(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
inode: Self::Inode,
|
||||
size: u32,
|
||||
) -> io::Result<ListxattrReply> {
|
||||
if !self.show_xattr {
|
||||
return Err(io::Error::from_raw_os_error(libc::ENOSYS));
|
||||
}
|
||||
|
||||
// determine the (\0-terminated list) to of xattr keys present, depending on the type of the inode.
|
||||
let xattrs_names = {
|
||||
let mut out = Vec::new();
|
||||
if let Some(inode_data) = self.inode_tracker.read().get(inode) {
|
||||
match *inode_data {
|
||||
InodeData::Directory(_) => {
|
||||
out.extend_from_slice(XATTR_NAME_DIRECTORY_DIGEST);
|
||||
out.push_byte(b'\x00');
|
||||
}
|
||||
InodeData::Regular(..) => {
|
||||
out.extend_from_slice(XATTR_NAME_BLOB_DIGEST);
|
||||
out.push_byte(b'\x00');
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
out
|
||||
};
|
||||
|
||||
if size == 0 {
|
||||
Ok(ListxattrReply::Count(xattrs_names.len() as u32))
|
||||
} else if size < xattrs_names.len() as u32 {
|
||||
Err(io::Error::from_raw_os_error(libc::ERANGE))
|
||||
} else {
|
||||
Ok(ListxattrReply::Names(xattrs_names.to_vec()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,18 +1,19 @@
|
|||
use bstr::ByteSlice;
|
||||
use bytes::Bytes;
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
ffi::{OsStr, OsString},
|
||||
io::{self, Cursor},
|
||||
os::unix::fs::MetadataExt,
|
||||
os::unix::{ffi::OsStrExt, fs::MetadataExt},
|
||||
path::Path,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
use tempfile::TempDir;
|
||||
use tokio_stream::{wrappers::ReadDirStream, StreamExt};
|
||||
|
||||
use super::{fuse::FuseDaemon, TvixStoreFs};
|
||||
use crate::proto as castorepb;
|
||||
use crate::proto::node::Node;
|
||||
use crate::proto::{self as castorepb};
|
||||
use crate::{
|
||||
blobservice::{BlobService, MemoryBlobService},
|
||||
directoryservice::{DirectoryService, MemoryDirectoryService},
|
||||
|
@ -40,6 +41,7 @@ fn do_mount<P: AsRef<Path>, BS, DS>(
|
|||
root_nodes: BTreeMap<bytes::Bytes, Node>,
|
||||
mountpoint: P,
|
||||
list_root: bool,
|
||||
show_xattr: bool,
|
||||
) -> io::Result<FuseDaemon>
|
||||
where
|
||||
BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
|
||||
|
@ -50,6 +52,7 @@ where
|
|||
directory_service,
|
||||
Arc::new(root_nodes),
|
||||
list_root,
|
||||
show_xattr,
|
||||
);
|
||||
FuseDaemon::new(Arc::new(fs), mountpoint.as_ref(), 4, false)
|
||||
}
|
||||
|
@ -250,6 +253,7 @@ async fn mount() {
|
|||
BTreeMap::default(),
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -272,6 +276,7 @@ async fn root() {
|
|||
BTreeMap::default(),
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -311,6 +316,7 @@ async fn root_with_listing() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
true, /* allow listing */
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -354,6 +360,7 @@ async fn stat_file_at_root() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -390,6 +397,7 @@ async fn read_file_at_root() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -426,6 +434,7 @@ async fn read_large_file_at_root() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -470,6 +479,7 @@ async fn symlink_readlink() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -516,6 +526,7 @@ async fn read_stat_through_symlink() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -560,6 +571,7 @@ async fn read_stat_directory() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -573,6 +585,91 @@ async fn read_stat_directory() {
|
|||
fuse_daemon.unmount().expect("unmount");
|
||||
}
|
||||
|
||||
/// Read a directory and file in the root, and ensure the xattrs expose blob or
|
||||
/// directory digests.
|
||||
#[tokio::test]
|
||||
async fn xattr() {
|
||||
// https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
|
||||
if !std::path::Path::new("/dev/fuse").exists() {
|
||||
eprintln!("skipping test");
|
||||
return;
|
||||
}
|
||||
let tmpdir = TempDir::new().unwrap();
|
||||
|
||||
let (blob_service, directory_service) = gen_svcs();
|
||||
let mut root_nodes = BTreeMap::default();
|
||||
|
||||
populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
|
||||
populate_blob_a(&blob_service, &mut root_nodes).await;
|
||||
|
||||
let mut fuse_daemon = do_mount(
|
||||
blob_service,
|
||||
directory_service,
|
||||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
true, /* support xattr */
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
// peek at the directory
|
||||
{
|
||||
let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
|
||||
|
||||
let xattr_names: Vec<OsString> = xattr::list(&p).expect("must succeed").collect();
|
||||
// There should be 1 key, XATTR_NAME_DIRECTORY_DIGEST.
|
||||
assert_eq!(1, xattr_names.len(), "there should be 1 xattr name");
|
||||
assert_eq!(
|
||||
super::XATTR_NAME_DIRECTORY_DIGEST,
|
||||
xattr_names.first().unwrap().as_encoded_bytes()
|
||||
);
|
||||
|
||||
// The key should equal to the string-formatted b3 digest.
|
||||
let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_DIRECTORY_DIGEST))
|
||||
.expect("must succeed")
|
||||
.expect("must be some");
|
||||
assert_eq!(
|
||||
fixtures::DIRECTORY_WITH_KEEP
|
||||
.digest()
|
||||
.to_string()
|
||||
.as_bytes()
|
||||
.as_bstr(),
|
||||
val.as_bstr()
|
||||
);
|
||||
|
||||
// Reading another xattr key is gonna return None.
|
||||
let val = xattr::get(&p, OsStr::from_bytes(b"user.cheesecake")).expect("must succeed");
|
||||
assert_eq!(None, val);
|
||||
}
|
||||
// peek at the file
|
||||
{
|
||||
let p = tmpdir.path().join(BLOB_A_NAME);
|
||||
|
||||
let xattr_names: Vec<OsString> = xattr::list(&p).expect("must succeed").collect();
|
||||
// There should be 1 key, XATTR_NAME_BLOB_DIGEST.
|
||||
assert_eq!(1, xattr_names.len(), "there should be 1 xattr name");
|
||||
assert_eq!(
|
||||
super::XATTR_NAME_BLOB_DIGEST,
|
||||
xattr_names.first().unwrap().as_encoded_bytes()
|
||||
);
|
||||
|
||||
// The key should equal to the string-formatted b3 digest.
|
||||
let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_BLOB_DIGEST))
|
||||
.expect("must succeed")
|
||||
.expect("must be some");
|
||||
assert_eq!(
|
||||
fixtures::BLOB_A_DIGEST.to_string().as_bytes().as_bstr(),
|
||||
val.as_bstr()
|
||||
);
|
||||
|
||||
// Reading another xattr key is gonna return None.
|
||||
let val = xattr::get(&p, OsStr::from_bytes(b"user.cheesecake")).expect("must succeed");
|
||||
assert_eq!(None, val);
|
||||
}
|
||||
|
||||
fuse_daemon.unmount().expect("unmount");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
/// Read a blob inside a directory. This ensures we successfully populate directory data.
|
||||
async fn read_blob_inside_dir() {
|
||||
|
@ -594,6 +691,7 @@ async fn read_blob_inside_dir() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -633,6 +731,7 @@ async fn read_blob_deep_inside_dir() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -675,6 +774,7 @@ async fn readdir() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -734,6 +834,7 @@ async fn readdir_deep() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -783,6 +884,7 @@ async fn check_attributes() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -857,6 +959,7 @@ async fn compare_inodes_directories() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -900,6 +1003,7 @@ async fn compare_inodes_files() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -948,6 +1052,7 @@ async fn compare_inodes_symlinks() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -990,6 +1095,7 @@ async fn read_wrong_paths_in_root() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -1044,6 +1150,7 @@ async fn disallow_writes() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -1075,6 +1182,7 @@ async fn missing_directory() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
@ -1122,6 +1230,7 @@ async fn missing_blob() {
|
|||
root_nodes,
|
||||
tmpdir.path(),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.expect("must succeed");
|
||||
|
||||
|
|
|
@ -161,6 +161,10 @@ enum Commands {
|
|||
/// (exhaustive) listing.
|
||||
#[clap(long, short, action)]
|
||||
list_root: bool,
|
||||
|
||||
#[arg(long, default_value_t = true)]
|
||||
/// Whether to expose blob and directory digests as extended attributes.
|
||||
show_xattr: bool,
|
||||
},
|
||||
/// Starts a tvix-store virtiofs daemon at the given socket path.
|
||||
#[cfg(feature = "virtiofs")]
|
||||
|
@ -183,6 +187,10 @@ enum Commands {
|
|||
/// (exhaustive) listing.
|
||||
#[clap(long, short, action)]
|
||||
list_root: bool,
|
||||
|
||||
#[arg(long, default_value_t = true)]
|
||||
/// Whether to expose blob and directory digests as extended attributes.
|
||||
show_xattr: bool,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -459,6 +467,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
list_root,
|
||||
threads,
|
||||
allow_other,
|
||||
show_xattr,
|
||||
} => {
|
||||
let (blob_service, directory_service, path_info_service) =
|
||||
tvix_store::utils::construct_services(
|
||||
|
@ -474,6 +483,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
directory_service,
|
||||
Arc::from(path_info_service),
|
||||
list_root,
|
||||
show_xattr,
|
||||
);
|
||||
info!(mount_path=?dest, "mounting");
|
||||
|
||||
|
@ -499,6 +509,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
directory_service_addr,
|
||||
path_info_service_addr,
|
||||
list_root,
|
||||
show_xattr,
|
||||
} => {
|
||||
let (blob_service, directory_service, path_info_service) =
|
||||
tvix_store::utils::construct_services(
|
||||
|
@ -514,6 +525,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
directory_service,
|
||||
Arc::from(path_info_service),
|
||||
list_root,
|
||||
show_xattr,
|
||||
);
|
||||
info!(socket_path=?socket, "starting virtiofs-daemon");
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ pub fn make_fs<BS, DS, PS>(
|
|||
directory_service: DS,
|
||||
path_info_service: PS,
|
||||
list_root: bool,
|
||||
show_xattr: bool,
|
||||
) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>>
|
||||
where
|
||||
BS: AsRef<dyn BlobService> + Send + Clone + 'static,
|
||||
|
@ -28,6 +29,7 @@ where
|
|||
directory_service,
|
||||
RootNodesWrapper(path_info_service),
|
||||
list_root,
|
||||
show_xattr,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue