diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs new file mode 100644 index 000000000..ecd9acfca --- /dev/null +++ b/tvix/castore/src/directoryservice/closure_validator.rs @@ -0,0 +1,254 @@ +use std::collections::{HashMap, HashSet}; + +use bstr::ByteSlice; + +use tracing::{instrument, trace}; + +use crate::{ + proto::{self, Directory}, + B3Digest, Error, +}; + +/// This can be used to validate a Directory closure (DAG of connected +/// Directories), and their insertion order. +/// +/// Directories need to be inserted (via `add`), in an order from the leaves to +/// the root. During insertion, We validate as much as we can at that time: +/// +/// - individual validation of Directory messages +/// - validation of insertion order (no upload of not-yet-known Directories) +/// - validation of size fields of referred Directories +/// +/// Internally it keeps all received Directories (and their sizes) in a HashMap, +/// keyed by digest. +/// +/// Once all Directories have been inserted, a finalize function can be +/// called to get a (deduplicated and) validated list of directories, in +/// insertion order. +/// During finalize, a check for graph connectivity is performed too, to ensure +/// there's no disconnected components, and only one root. +#[derive(Default)] +pub struct ClosureValidator { + directories_and_sizes: HashMap, + + /// Keeps track of the last-inserted directory digest. Used to start the + /// connectivity check. + last_directory_digest: Option, +} + +impl ClosureValidator { + /// Insert a new Directory into the closure. + /// Perform individual Directory validation, validation of insertion order + // and size fields. + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] + pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + let digest = directory.digest(); + + if self.directories_and_sizes.contains_key(&digest) { + trace!("already seen, skipping"); + return Ok(()); + } + + // Do some general validation + directory + .validate() + .map_err(|e| Error::InvalidRequest(e.to_string()))?; + + // Ensure the directory only refers to directories which we already accepted. + for dir in &directory.directories { + let dir_dgst = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated + + // Ensure the digest has already been seen + let (_, recorded_dir_size) = + self.directories_and_sizes.get(&dir_dgst).ok_or_else(|| { + Error::InvalidRequest(format!( + "'{}' refers to unseen child dir: {}", + dir.name.as_bstr(), + dir_dgst + )) + })?; + + // Ensure the size specified in the child node matches our records. + if dir.size != *recorded_dir_size { + return Err(Error::InvalidRequest(format!( + "'{}' has wrong size, specified {}, recorded {}", + dir.name.as_bstr(), + dir.size, + recorded_dir_size + ))); + } + } + + trace!("inserting"); + let directory_size = directory.size(); + self.directories_and_sizes + .insert(digest.clone(), (directory, directory_size)); + self.last_directory_digest = Some(digest); + + Ok(()) + } + + /// Ensure that all inserted Directories are connected, then return a + /// (deduplicated) and validated list of directories, in from-leaves-to-root + /// order. + /// In case no elements have been inserted, returns an empty list. + #[instrument(level = "trace", skip_all, err)] + pub(crate) fn finalize(mut self) -> Result, Error> { + if self.last_directory_digest.is_none() { + return Ok(vec![]); + } + let root_digest = self.last_directory_digest.unwrap(); + + // recursively walk all directories reachable from there, + // ensure we visited all nodes that were uploaded. + // If not, we might have received multiple disconnected graphs. + + // The list of directories we know we need to visit. + // Once we're finished working, and (there's no errors), this in reversed order will + // be a valid insertion order, and directories_and_sizes will be empty. + let mut dirs_to_visit = Vec::with_capacity(self.directories_and_sizes.len()); + dirs_to_visit.push( + self.directories_and_sizes + .remove(&root_digest) + .expect("root digest not found") + .0, + ); + // The set of digests seen while going visiting all directories. + let mut seen_dir_digests = HashSet::new(); + + // This loop moves gradually to the end of `dirs_to_visit`, while it's being + // extended. + // The loop stops once it reaches the end. + let mut i = 0; + while let Some(directory) = dirs_to_visit.get(i).map(|d| d.to_owned()) { + // lookup all child directories and put them in the back of dirs_to_visit, + // if they're not already there. + for child_dir in &directory.directories { + let child_digest = B3Digest::try_from(child_dir.digest.to_owned()).unwrap(); // validated + + // In case the digest is neither already visited nor already in dirs_to_visit, + // add it to the end of it. + // We don't need to check for the hash we're currently + // visiting, as we can't self-reference. + if !seen_dir_digests.contains(&child_digest) { + dirs_to_visit.push( + self.directories_and_sizes + .remove(&child_digest) + .expect("child digest not found") + .0, + ); + seen_dir_digests.insert(child_digest); + } + } + + i += 1; + } + + // check directories_and_sizes is empty. + if !self.directories_and_sizes.is_empty() { + if cfg!(debug_assertions) { + return Err(Error::InvalidRequest(format!( + "found {} disconnected nodes: {:?}", + self.directories_and_sizes.len(), + self.directories_and_sizes + ))); + } else { + return Err(Error::InvalidRequest(format!( + "found {} disconnected nodes", + self.directories_and_sizes.len() + ))); + } + } + + // Reverse to have correct insertion order. + dirs_to_visit.reverse(); + + Ok(dirs_to_visit) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, + proto::{self, Directory}, + }; + use lazy_static::lazy_static; + use test_case::test_case; + + lazy_static! { + pub static ref BROKEN_DIRECTORY : Directory = Directory { + symlinks: vec![proto::SymlinkNode { + name: "".into(), // invalid name! + target: "doesntmatter".into(), + }], + ..Default::default() + }; + + pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { + directories: vec![proto::DirectoryNode { + name: "foo".into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size() + 42, // wrong! + }], + ..Default::default() + }; + } + + use super::ClosureValidator; + + /// Uploading an empty directory should succeed. + #[test_case(vec![&DIRECTORY_A], false, Some(vec![&DIRECTORY_A]); "empty directory")] + /// Uploading A, then B (referring to A) should succeed. + #[test_case(vec![&DIRECTORY_A, &DIRECTORY_B], false, Some(vec![&DIRECTORY_A, &DIRECTORY_B]); "simple closure")] + /// Uploading A, then A, then C (referring to A twice) should succeed. + /// We pretend to be a dumb client not deduping directories. + #[test_case(vec![&DIRECTORY_A, &DIRECTORY_A, &DIRECTORY_C], false, Some(vec![&DIRECTORY_A, &DIRECTORY_C]); "same child")] + /// Uploading A, then C (referring to A twice) should succeed. + #[test_case(vec![&DIRECTORY_A, &DIRECTORY_C], false, Some(vec![&DIRECTORY_A, &DIRECTORY_C]); "same child dedup")] + /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, + /// as B itself would be left unconnected. + #[test_case(vec![&DIRECTORY_A, &DIRECTORY_C, &DIRECTORY_B], false, None; "unconnected node")] + /// Uploading B (referring to A) should fail immediately, because A was never uploaded. + #[test_case(vec![&DIRECTORY_B], true, None; "dangling pointer")] + /// Uploading a directory failing validation should fail immediately. + #[test_case(vec![&BROKEN_DIRECTORY], true, None; "failing validation")] + /// Uploading a directory which refers to another Directory with a wrong size should fail. + #[test_case(vec![&DIRECTORY_A, &BROKEN_PARENT_DIRECTORY], true, None; "wrong size in parent")] + fn test_uploads( + directories_to_upload: Vec<&Directory>, + exp_fail_upload_last: bool, + exp_finalize: Option>, // Some(_) if finalize successful, None if not. + ) { + let mut dcv = ClosureValidator::default(); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp = dcv.add((*d).clone()); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(resp.is_err(), "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp.is_ok(), "expect put to succeed"); + } + } + + // everything was uploaded successfully. Test drain(). + let resp = dcv.finalize(); + + match exp_finalize { + Some(exp_drain) => { + assert_eq!( + Vec::from_iter(exp_drain.into_iter().map(|e| e.to_owned())), + resp.expect("drain should succeed") + ); + } + None => { + resp.expect_err("drain should fail"); + } + } + } +} diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index ca8e60e87..b6d1c2fca 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -2,6 +2,7 @@ use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use tonic::async_trait; +mod closure_validator; mod from_addr; mod grpc; mod memory; @@ -9,6 +10,7 @@ mod sled; mod traverse; mod utils; +pub use self::closure_validator::ClosureValidator; pub use self::from_addr::from_addr; pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService;