diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs deleted file mode 100644 index b9746a5a0..000000000 --- a/tvix/castore/src/directoryservice/closure_validator.rs +++ /dev/null @@ -1,309 +0,0 @@ -use std::collections::{HashMap, HashSet}; - -use bstr::ByteSlice; - -use petgraph::{ - graph::{DiGraph, NodeIndex}, - visit::{Bfs, Walker}, -}; -use tracing::instrument; - -use crate::{ - proto::{self, Directory}, - B3Digest, Error, -}; - -type DirectoryGraph = DiGraph; - -/// This can be used to validate a Directory closure (DAG of connected -/// Directories), and their insertion order. -/// -/// Directories need to be inserted (via `add`), in an order from the leaves to -/// the root (DFS Post-Order). -/// During insertion, We validate as much as we can at that time: -/// -/// - individual validation of Directory messages -/// - validation of insertion order (no upload of not-yet-known Directories) -/// - validation of size fields of referred Directories -/// -/// Internally it keeps all received Directories in a directed graph, -/// with node weights being the Directories and edges pointing to child -/// directories. -/// -/// Once all Directories have been inserted, a finalize function can be -/// called to get a (deduplicated and) validated list of directories, in -/// insertion order. -/// During finalize, a check for graph connectivity is performed too, to ensure -/// there's no disconnected components, and only one root. -#[derive(Default)] -pub struct ClosureValidator { - // A directed graph, using Directory as node weight, without edge weights. - // Edges point from parents to children. - graph: DirectoryGraph, - - // A lookup table from directory digest to node index. - digest_to_node_ix: HashMap, - - /// Keeps track of the last-inserted directory graph node index. - /// On a correct insert, this will be the root node, from which the DFS post - /// order traversal will start from. - last_directory_ix: Option, -} - -impl ClosureValidator { - /// Insert a new Directory into the closure. - /// Perform individual Directory validation, validation of insertion order - /// and size fields. - #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { - let digest = directory.digest(); - - // If we already saw this node previously, it's already validated and in the graph. - if self.digest_to_node_ix.contains_key(&digest) { - return Ok(()); - } - - // Do some general validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(e.to_string()))?; - - // Ensure the directory only refers to directories which we already accepted. - // We lookup their node indices and add them to a HashSet. - let mut child_ixs = HashSet::new(); - for dir in &directory.directories { - let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated - - // Ensure the digest has already been seen - let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| { - Error::InvalidRequest(format!( - "'{}' refers to unseen child dir: {}", - dir.name.as_bstr(), - &child_digest - )) - })?; - - // Ensure the size specified in the child node matches the directory size itself. - let recorded_child_size = self - .graph - .node_weight(child_ix) - .expect("node not found") - .size(); - - // Ensure the size specified in the child node matches our records. - if dir.size != recorded_child_size { - return Err(Error::InvalidRequest(format!( - "'{}' has wrong size, specified {}, recorded {}", - dir.name.as_bstr(), - dir.size, - recorded_child_size - ))); - } - - child_ixs.insert(child_ix); - } - - // Insert node into the graph, and add edges to all children. - let node_ix = self.graph.add_node(directory); - for child_ix in child_ixs { - self.graph.add_edge(node_ix, child_ix, ()); - } - - // Record the mapping from digest to node_ix in our lookup table. - self.digest_to_node_ix.insert(digest, node_ix); - - // Update last_directory_ix. - self.last_directory_ix = Some(node_ix); - - Ok(()) - } - - /// Ensure that all inserted Directories are connected, then return a - /// (deduplicated) and validated list of directories, in from-leaves-to-root - /// order. - /// In case no elements have been inserted, returns an empty list. - #[instrument(level = "trace", skip_all, err)] - pub(crate) fn finalize(self) -> Result, Error> { - let (graph, _) = match self.finalize_raw()? { - None => return Ok(vec![]), - Some(v) => v, - }; - // Dissolve the graph, returning the nodes as a Vec. - // As the graph was populated in a valid DFS PostOrder, we can return - // nodes in that same order. - let (nodes, _edges) = graph.into_nodes_edges(); - Ok(nodes.into_iter().map(|x| x.weight).collect()) - } - - /// Ensure that all inserted Directories are connected, then return a - /// (deduplicated) and validated list of directories, in from-root-to-leaves - /// order. - /// In case no elements have been inserted, returns an empty list. - #[instrument(level = "trace", skip_all, err)] - pub(crate) fn finalize_root_to_leaves(self) -> Result, Error> { - let (graph, root) = match self.finalize_raw()? { - None => return Ok(vec![]), - Some(v) => v, - }; - - // do a BFS traversal of the graph, starting with the root node to get - // all nodes reachable from there. - let traversal = Bfs::new(&graph, root); - - let order = traversal.iter(&graph).collect::>(); - - let (nodes, _edges) = graph.into_nodes_edges(); - - // Convert to option, so that we can take individual nodes out without messing up the - // indices - let mut nodes = nodes.into_iter().map(Some).collect::>(); - - Ok(order - .iter() - .map(|i| nodes[i.index()].take().unwrap().weight) - .collect()) - } - - /// Internal implementation of closure validation - #[instrument(level = "trace", skip_all, err)] - fn finalize_raw(self) -> Result, Error> { - // If no nodes were inserted, an empty list is returned. - let last_directory_ix = if let Some(x) = self.last_directory_ix { - x - } else { - return Ok(None); - }; - - // do a BFS traversal of the graph, starting with the root node to get - // (the count of) all nodes reachable from there. - let mut traversal = Bfs::new(&self.graph, last_directory_ix); - - let mut visited_directory_count = 0; - #[cfg(debug_assertions)] - let mut visited_directory_ixs = HashSet::new(); - #[cfg_attr(not(debug_assertions), allow(unused))] - while let Some(directory_ix) = traversal.next(&self.graph) { - #[cfg(debug_assertions)] - visited_directory_ixs.insert(directory_ix); - - visited_directory_count += 1; - } - - // If the number of nodes collected equals the total number of nodes in - // the graph, we know all nodes are connected. - if visited_directory_count != self.graph.node_count() { - // more or less exhaustive error reporting. - #[cfg(debug_assertions)] - { - let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect(); - - let unvisited_directories: HashSet<_> = all_directory_ixs - .difference(&visited_directory_ixs) - .map(|ix| self.graph.node_weight(*ix).expect("node not found")) - .collect(); - - return Err(Error::InvalidRequest(format!( - "found {} disconnected directories: {:?}", - self.graph.node_count() - visited_directory_ixs.len(), - unvisited_directories - ))); - } - #[cfg(not(debug_assertions))] - { - return Err(Error::InvalidRequest(format!( - "found {} disconnected directories", - self.graph.node_count() - visited_directory_count - ))); - } - } - - Ok(Some((self.graph, last_directory_ix))) - } -} - -#[cfg(test)] -mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; - use rstest::rstest; - - lazy_static! { - pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // invalid name! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() - }; - } - - use super::ClosureValidator; - - #[rstest] - /// Uploading an empty directory should succeed. - #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] - /// Uploading A, then B (referring to A) should succeed. - #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] - /// Uploading A, then A, then C (referring to A twice) should succeed. - /// We pretend to be a dumb client not deduping directories. - #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] - /// Uploading A, then C (referring to A twice) should succeed. - #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] - /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, - /// as B itself would be left unconnected. - #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] - /// Uploading B (referring to A) should fail immediately, because A was never uploaded. - #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] - /// Uploading a directory which refers to another Directory with a wrong size should fail. - #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] - fn test_uploads( - #[case] directories_to_upload: &[&Directory], - #[case] exp_fail_upload_last: bool, - #[case] exp_finalize: Option>, // Some(_) if finalize successful, None if not. - ) { - let mut dcv = ClosureValidator::default(); - let len_directories_to_upload = directories_to_upload.len(); - - for (i, d) in directories_to_upload.iter().enumerate() { - let resp = dcv.add((*d).clone()); - if i == len_directories_to_upload - 1 && exp_fail_upload_last { - assert!(resp.is_err(), "expect last put to fail"); - - // We don't really care anymore what finalize() would return, as - // the add() failed. - return; - } else { - assert!(resp.is_ok(), "expect put to succeed"); - } - } - - // everything was uploaded successfully. Test finalize(). - let resp = dcv.finalize(); - - match exp_finalize { - Some(directories) => { - assert_eq!( - Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), - resp.expect("drain should succeed") - ); - } - None => { - resp.expect_err("drain should fail"); - } - } - } -} diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs new file mode 100644 index 000000000..e6b9b1633 --- /dev/null +++ b/tvix/castore/src/directoryservice/directory_graph.rs @@ -0,0 +1,413 @@ +use std::collections::HashMap; + +use bstr::ByteSlice; + +use petgraph::{ + graph::{DiGraph, NodeIndex}, + visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker}, + Direction, Incoming, +}; +use tracing::instrument; + +use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; +use crate::{ + proto::{self, Directory, DirectoryNode}, + B3Digest, +}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("{0}")] + ValidationError(String), +} + +/// This can be used to validate and/or re-order a Directory closure (DAG of +/// connected Directories), and their insertion order. +/// +/// The DirectoryGraph is parametrized on the insertion order, and can be +/// constructed using the Default trait, or using `with_order` if the +/// OrderValidator needs to be customized. +/// +/// If the user is receiving directories from canonical protobuf encoding in +/// root-to-leaves order, and parsing them, she can call `digest_allowed` +/// _before_ parsing the protobuf record and then add it with `add_unchecked`. +/// All other users insert the directories via `add`, in their specified order. +/// During insertion, we validate as much as we can at that time: +/// +/// - individual validation of Directory messages +/// - validation of insertion order +/// - validation of size fields of referred Directories +/// +/// Internally it keeps all received Directories in a directed graph, +/// with node weights being the Directories and edges pointing to child/parent +/// directories. +/// +/// Once all Directories have been inserted, a validate function can be +/// called to perform a check for graph connectivity and ensure there's no +/// disconnected components or missing nodes. +/// Finally, the `drain_leaves_to_root` or `drain_root_to_leaves` can be +/// _chained_ on validate to get an iterator over the (deduplicated and) +/// validated list of directories in either order. +#[derive(Default)] +pub struct DirectoryGraph { + // A directed graph, using Directory as node weight. + // Edges point from parents to children. + // + // Nodes with None weigths might exist when a digest has been referred to but the directory + // with this digest has not yet been sent. + // + // The option in the edge weight tracks the pending validation state of the respective edge, for example if + // the child has not been added yet. + graph: DiGraph, Option>, + + // A lookup table from directory digest to node index. + digest_to_node_ix: HashMap, + + order_validator: O, +} + +pub struct ValidatedDirectoryGraph { + graph: DiGraph, Option>, + + root: Option, +} + +fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { + // Ensure the size specified in the child node matches our records. + if dir.size != child.size() { + return Err(Error::ValidationError(format!( + "'{}' has wrong size, specified {}, recorded {}", + dir.name.as_bstr(), + dir.size, + child.size(), + ))); + } + Ok(()) +} + +impl DirectoryGraph { + /// Insert a new Directory into the closure + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] + pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + if !self.order_validator.add_directory(&directory) { + return Err(Error::ValidationError( + "unknown directory was referenced".into(), + )); + } + self.add_order_unchecked(directory) + } +} + +impl DirectoryGraph { + /// If the user is parsing directories from canonical protobuf encoding, she can + /// call `digest_allowed` _before_ parsing the protobuf record and then add it + /// with `add_unchecked`. + pub fn digest_allowed(&self, digest: B3Digest) -> bool { + self.order_validator.digest_allowed(&digest) + } + + /// Insert a new Directory into the closure + #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] + pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + let digest = directory.digest(); + if !self.order_validator.digest_allowed(&digest) { + return Err(Error::ValidationError("unexpected digest".into())); + } + self.order_validator.add_directory_unchecked(&directory); + self.add_order_unchecked(directory) + } +} + +impl DirectoryGraph { + /// Customize the ordering, i.e. for pre-setting the root of the RootToLeavesValidator + pub fn with_order(order_validator: O) -> Self { + Self { + graph: Default::default(), + digest_to_node_ix: Default::default(), + order_validator, + } + } + + /// Adds a directory which has already been confirmed to be in-order to the graph + pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> { + // Do some basic validation + directory + .validate() + .map_err(|e| Error::ValidationError(e.to_string()))?; + + let digest = directory.digest(); + + // Teach the graph about the existence of a node with this digest + let ix = *self + .digest_to_node_ix + .entry(digest) + .or_insert_with(|| self.graph.add_node(None)); + + if self.graph[ix].is_some() { + // The node is already in the graph, there is nothing to do here. + return Ok(()); + } + + // set up edges to all child directories + for subdir in &directory.directories { + let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap(); + + let child_ix = *self + .digest_to_node_ix + .entry(subdir_digest) + .or_insert_with(|| self.graph.add_node(None)); + + let pending_edge_check = match &self.graph[child_ix] { + Some(child) => { + // child is already available, validate the edge now + check_edge(subdir, child)?; + None + } + None => Some(subdir.clone()), // pending validation + }; + self.graph.add_edge(ix, child_ix, pending_edge_check); + } + + // validate the edges from parents to this node + // this collects edge ids in a Vec because there is no edges_directed_mut :'c + for edge_id in self + .graph + .edges_directed(ix, Direction::Incoming) + .map(|edge_ref| edge_ref.id()) + .collect::>() + .into_iter() + { + let edge_weight = self + .graph + .edge_weight_mut(edge_id) + .expect("edge not found") + .take() + .expect("edge is already validated"); + check_edge(&edge_weight, &directory)?; + } + + // finally, store the directory information in the node weight + self.graph[ix] = Some(directory); + + Ok(()) + } + + #[instrument(level = "trace", skip_all, err)] + pub fn validate(self) -> Result { + // find all initial nodes (nodes without incoming edges) + let mut roots = self + .graph + .node_identifiers() + .filter(|&a| self.graph.neighbors_directed(a, Incoming).next().is_none()); + + let root = roots.next(); + if roots.next().is_some() { + return Err(Error::ValidationError( + "graph has disconnected roots".into(), + )); + } + + // test that the graph is complete + if self.graph.raw_nodes().iter().any(|n| n.weight.is_none()) { + return Err(Error::ValidationError("graph is incomplete".into())); + } + + Ok(ValidatedDirectoryGraph { + graph: self.graph, + root, + }) + } +} + +impl ValidatedDirectoryGraph { + /// Return the list of directories in from-root-to-leaves order. + /// In case no elements have been inserted, returns an empty list. + /// + /// panics if the specified root is not in the graph + #[instrument(level = "trace", skip_all)] + pub fn drain_root_to_leaves(self) -> impl Iterator { + let order = match self.root { + Some(root) => { + // do a BFS traversal of the graph, starting with the root node + Bfs::new(&self.graph, root) + .iter(&self.graph) + .collect::>() + } + None => vec![], // No nodes have been inserted, do not traverse + }; + + let (mut nodes, _edges) = self.graph.into_nodes_edges(); + + order + .into_iter() + .filter_map(move |i| nodes[i.index()].weight.take()) + } + + /// Return the list of directories in from-leaves-to-root order. + /// In case no elements have been inserted, returns an empty list. + /// + /// panics when the specified root is not in the graph + #[instrument(level = "trace", skip_all)] + pub fn drain_leaves_to_root(self) -> impl Iterator { + let order = match self.root { + Some(root) => { + // do a DFS Post-Order traversal of the graph, starting with the root node + DfsPostOrder::new(&self.graph, root) + .iter(&self.graph) + .collect::>() + } + None => vec![], // No nodes have been inserted, do not traverse + }; + + let (mut nodes, _edges) = self.graph.into_nodes_edges(); + + order + .into_iter() + .filter_map(move |i| nodes[i.index()].weight.take()) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, + proto::{self, Directory}, + }; + use lazy_static::lazy_static; + use rstest::rstest; + + lazy_static! { + pub static ref BROKEN_DIRECTORY : Directory = Directory { + symlinks: vec![proto::SymlinkNode { + name: "".into(), // invalid name! + target: "doesntmatter".into(), + }], + ..Default::default() + }; + + pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { + directories: vec![proto::DirectoryNode { + name: "foo".into(), + digest: DIRECTORY_A.digest().into(), + size: DIRECTORY_A.size() + 42, // wrong! + }], + ..Default::default() + }; + } + + use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; + + #[rstest] + /// Uploading an empty directory should succeed. + #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] + /// Uploading A, then B (referring to A) should succeed. + #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] + /// Uploading A, then A, then C (referring to A twice) should succeed. + /// We pretend to be a dumb client not deduping directories. + #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] + /// Uploading A, then C (referring to A twice) should succeed. + #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] + /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, + /// as B itself would be left unconnected. + #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] + /// Uploading B (referring to A) should fail immediately, because A was never uploaded. + #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] + /// Uploading a directory failing validation should fail immediately. + #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] + /// Uploading a directory which refers to another Directory with a wrong size should fail. + #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] + fn test_uploads( + #[case] directories_to_upload: &[&Directory], + #[case] exp_fail_upload_last: bool, + #[case] exp_finalize: Option>, // Some(_) if finalize successful, None if not. + ) { + let mut dcv = DirectoryGraph::::default(); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp = dcv.add((*d).clone()); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(resp.is_err(), "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp.is_ok(), "expect put to succeed"); + } + } + + // everything was uploaded successfully. Test finalize(). + let resp = dcv + .validate() + .map(|validated| validated.drain_leaves_to_root().collect::>()); + + match exp_finalize { + Some(directories) => { + assert_eq!( + Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), + resp.expect("drain should succeed") + ); + } + None => { + resp.expect_err("drain should fail"); + } + } + } + + #[rstest] + /// Downloading an empty directory should succeed. + #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] + /// Downlading B, then A (referenced by B) should succeed. + #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))] + /// Downloading C (referring to A twice), then A should succeed. + #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))] + /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root) + #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] + /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). + #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] + /// Downloading a directory failing validation should fail immediately. + #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)] + /// Downloading a directory which refers to another Directory with a wrong size should fail. + #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] + fn test_downloads( + #[case] root: &Directory, + #[case] directories_to_upload: &[&Directory], + #[case] exp_fail_upload_last: bool, + #[case] exp_finalize: Option>, // Some(_) if finalize successful, None if not. + ) { + let mut dcv = + DirectoryGraph::with_order(RootToLeavesValidator::new_with_root_digest(root.digest())); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp = dcv.add((*d).clone()); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(resp.is_err(), "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp.is_ok(), "expect put to succeed"); + } + } + + // everything was uploaded successfully. Test finalize(). + let resp = dcv + .validate() + .map(|validated| validated.drain_leaves_to_root().collect::>()); + + match exp_finalize { + Some(directories) => { + assert_eq!( + Vec::from_iter(directories.iter().map(|e| (*e).to_owned())), + resp.expect("drain should succeed") + ); + } + None => { + resp.expect_err("drain should fail"); + } + } + } +} diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 3f180ef16..0d717a8a3 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -2,11 +2,12 @@ use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use tonic::async_trait; -mod closure_validator; +mod directory_graph; mod from_addr; mod grpc; mod memory; mod object_store; +mod order_validator; mod simple_putter; mod sled; #[cfg(test)] @@ -14,11 +15,12 @@ pub mod tests; mod traverse; mod utils; -pub use self::closure_validator::ClosureValidator; +pub use self::directory_graph::DirectoryGraph; pub use self::from_addr::from_addr; pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; pub use self::object_store::ObjectStoreDirectoryService; +pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; pub use self::simple_putter::SimplePutter; pub use self::sled::SledDirectoryService; pub use self::traverse::descend_to; diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index 64ce335ed..90e53f928 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -16,7 +16,7 @@ use tonic::async_trait; use tracing::{instrument, trace, warn, Level}; use url::Url; -use super::{ClosureValidator, DirectoryPutter, DirectoryService}; +use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; use crate::{proto, B3Digest, Error}; /// Stores directory closures in an object store. @@ -177,7 +177,7 @@ struct ObjectStoreDirectoryPutter { object_store: Arc, base_path: Path, - directory_validator: Option, + directory_validator: Option>, } impl ObjectStoreDirectoryPutter { @@ -197,7 +197,9 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; } } @@ -214,7 +216,11 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { // retrieve the validated directories. // It is important that they are in topological order (root first), // since that's how we want to retrieve them from the object store in the end. - let directories = validator.finalize_root_to_leaves()?; + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_root_to_leaves() + .collect::>(); // Get the root digest let root_digest = directories diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs new file mode 100644 index 000000000..6045f5d24 --- /dev/null +++ b/tvix/castore/src/directoryservice/order_validator.rs @@ -0,0 +1,181 @@ +use std::collections::HashSet; +use tracing::warn; + +use crate::{proto::Directory, B3Digest}; + +pub trait OrderValidator { + /// Update the order validator's state with the directory + /// Returns whether the directory was accepted + fn add_directory(&mut self, directory: &Directory) -> bool; +} + +#[derive(Default)] +/// Validates that newly introduced directories are already referenced from +/// the root via existing directories. +/// Commonly used when _receiving_ a directory closure _from_ a store. +pub struct RootToLeavesValidator { + /// Only used to remember the root node, not for validation + expected_digests: HashSet, +} + +impl RootToLeavesValidator { + /// Use to validate the root digest of the closure upon receiving the first + /// directory. + pub fn new_with_root_digest(root_digest: B3Digest) -> Self { + let mut this = Self::default(); + this.expected_digests.insert(root_digest); + this + } + + /// Checks if a directory is in-order based on its digest. + /// + /// Particularly useful when receiving directories in canonical protobuf + /// encoding, so that directories not connected to the root can be rejected + /// without parsing. + /// + /// After parsing, the directory must be passed to `add_directory_unchecked` + /// to add its children to the list of expected digests. + pub fn digest_allowed(&self, digest: &B3Digest) -> bool { + self.expected_digests.is_empty() // we don't know the root node; allow any + || self.expected_digests.contains(digest) + } + + /// Update the order validator's state with the directory + pub fn add_directory_unchecked(&mut self, directory: &Directory) { + // No initial root was specified and this is the first directory + if self.expected_digests.is_empty() { + self.expected_digests.insert(directory.digest()); + } + + for subdir in &directory.directories { + // Allow the children to appear next + let subdir_digest = subdir.digest.clone().try_into().unwrap(); + self.expected_digests.insert(subdir_digest); + } + } +} + +impl OrderValidator for RootToLeavesValidator { + fn add_directory(&mut self, directory: &Directory) -> bool { + if !self.digest_allowed(&directory.digest()) { + return false; + } + self.add_directory_unchecked(directory); + true + } +} + +#[derive(Default)] +/// Validates that newly uploaded directories only reference directories which +/// have already been introduced. +/// Commonly used when _uploading_ a directory closure _to_ a store. +pub struct LeavesToRootValidator { + /// This is empty in the beginning, and gets filled as leaves and intermediates are + /// inserted + allowed_references: HashSet, +} + +impl OrderValidator for LeavesToRootValidator { + fn add_directory(&mut self, directory: &Directory) -> bool { + let digest = directory.digest(); + + for subdir in &directory.directories { + let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory() + if !self.allowed_references.contains(&subdir_digest) { + warn!( + directory.digest = %digest, + subdirectory.digest = %subdir_digest, + "unexpected directory reference" + ); + return false; + } + } + + self.allowed_references.insert(digest.clone()); + + true + } +} + +#[cfg(test)] +mod tests { + use super::{LeavesToRootValidator, RootToLeavesValidator}; + use crate::directoryservice::order_validator::OrderValidator; + use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; + use crate::proto::Directory; + use rstest::rstest; + + #[rstest] + /// Uploading an empty directory should succeed. + #[case::empty_directory(&[&*DIRECTORY_A], false)] + /// Uploading A, then B (referring to A) should succeed. + #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false)] + /// Uploading A, then A, then C (referring to A twice) should succeed. + /// We pretend to be a dumb client not deduping directories. + #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false)] + /// Uploading A, then C (referring to A twice) should succeed. + #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false)] + /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close, + /// as B itself would be left unconnected. + #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false)] + /// Uploading B (referring to A) should fail immediately, because A was never uploaded. + #[case::dangling_pointer(&[&*DIRECTORY_B], true)] + fn leaves_to_root( + #[case] directories_to_upload: &[&Directory], + #[case] exp_fail_upload_last: bool, + ) { + let mut validator = LeavesToRootValidator::default(); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp = validator.add_directory(d); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(!resp, "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp, "expect put to succeed"); + } + } + } + + #[rstest] + /// Downloading an empty directory should succeed. + #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false)] + /// Downlading B, then A (referenced by B) should succeed. + #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false)] + /// Downloading C (referring to A twice), then A should succeed. + #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false)] + /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root) + #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true)] + /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). + #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true)] + fn root_to_leaves( + #[case] root: &Directory, + #[case] directories_to_upload: &[&Directory], + #[case] exp_fail_upload_last: bool, + ) { + let mut validator = RootToLeavesValidator::new_with_root_digest(root.digest()); + let len_directories_to_upload = directories_to_upload.len(); + + for (i, d) in directories_to_upload.iter().enumerate() { + let resp1 = validator.digest_allowed(&d.digest()); + let resp = validator.add_directory(d); + assert_eq!( + resp1, resp, + "digest_allowed should return the same value as add_directory" + ); + if i == len_directories_to_upload - 1 && exp_fail_upload_last { + assert!(!resp, "expect last put to fail"); + + // We don't really care anymore what finalize() would return, as + // the add() failed. + return; + } else { + assert!(resp, "expect put to succeed"); + } + } + } +} diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs index 25617ebca..dc54e3d11 100644 --- a/tvix/castore/src/directoryservice/simple_putter.rs +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -1,6 +1,6 @@ -use super::ClosureValidator; use super::DirectoryPutter; use super::DirectoryService; +use super::{DirectoryGraph, LeavesToRootValidator}; use crate::proto; use crate::B3Digest; use crate::Error; @@ -14,7 +14,7 @@ use tracing::warn; pub struct SimplePutter { directory_service: DS, - directory_validator: Option, + directory_validator: Option>, } impl SimplePutter { @@ -33,7 +33,9 @@ impl DirectoryPutter for SimplePutter { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; } } @@ -46,7 +48,11 @@ impl DirectoryPutter for SimplePutter { None => Err(Error::InvalidRequest("already closed".to_string())), Some(validator) => { // retrieve the validated directories. - let directories = validator.finalize()?; + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_leaves_to_root() + .collect::>(); // Get the root digest, which is at the end (cf. insertion order) let root_digest = directories diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 9490a49c0..bd98ed6b1 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -8,7 +8,7 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{ClosureValidator, DirectoryPutter, DirectoryService}; +use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; #[derive(Clone)] pub struct SledDirectoryService { @@ -135,7 +135,7 @@ pub struct SledDirectoryPutter { /// The directories (inside the directory validator) that we insert later, /// or None, if they were already inserted. - directory_validator: Option, + directory_validator: Option>, } #[async_trait] @@ -145,7 +145,9 @@ impl DirectoryPutter for SledDirectoryPutter { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; } } @@ -162,7 +164,11 @@ impl DirectoryPutter for SledDirectoryPutter { let tree = self.tree.clone(); move || { // retrieve the validated directories. - let directories = validator.finalize()?; + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_leaves_to_root() + .collect::>(); // Get the root digest, which is at the end (cf. insertion order) let root_digest = directories diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs index 5c1428690..ce1d2bcd2 100644 --- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -1,4 +1,5 @@ -use crate::directoryservice::ClosureValidator; +use crate::directoryservice::DirectoryGraph; +use crate::directoryservice::LeavesToRootValidator; use crate::proto; use crate::{directoryservice::DirectoryService, B3Digest}; use futures::stream::BoxStream; @@ -78,14 +79,20 @@ where ) -> Result, Status> { let mut req_inner = request.into_inner(); - // We put all Directory messages we receive into ClosureValidator first. - let mut validator = ClosureValidator::default(); + // We put all Directory messages we receive into DirectoryGraph. + let mut validator = DirectoryGraph::::default(); while let Some(directory) = req_inner.message().await? { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?; } // drain, which validates connectivity too. - let directories = validator.finalize()?; + let directories = validator + .validate() + .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))? + .drain_leaves_to_root() + .collect::>(); let mut directory_putter = self.directory_service.put_multiple_start(); for directory in directories {