fix(tvix/castore/directory): fix graph traversal

Use a proper graph library to ensure all nodes are reachable from the
root.

We had a bit of that handrolled during add(), as well as later, which
had an annoying bug:

Redundant nodes were omitted during insert, but when returning the list
during finalize, we did not properly account they need to be introduced
before their parents are sent.

We now simply populate a petgraph DiGraph during insert (skipping
inserting nodes we already saw), and use petgraph's DfsPostOrder to
traverse the graph during finalize.

If the number of returned indices equals the total number of nodes in
the graph, all nodes are reachable from the root, we can consume the
graph and return the nodes as a vec, in the same order as the traversal
(and insertion).

Providing a regression test for the initial bug is challenging, as the
current code uses a bunch of HashSets. I manually tested ingesting a
full NixOS closure using this mechanism (via gRPC, which exposes this
problem, as it validates twice), and it now works.

Change-Id: Ic1d5e3e981f2993cc08c5c6b60ad895e578326dc
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11418
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-04-13 23:33:18 +03:00 committed by clbot
parent 6ebaa7b88a
commit 9498ac936e
4 changed files with 107 additions and 88 deletions

1
tvix/Cargo.lock generated
View file

@ -4352,6 +4352,7 @@ dependencies = [
"libc", "libc",
"object_store", "object_store",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"petgraph",
"pin-project-lite", "pin-project-lite",
"prost 0.12.3", "prost 0.12.3",
"prost-build", "prost-build",

View file

@ -7768,6 +7768,7 @@ rec {
"serde_derive" = [ "dep:serde_derive" ]; "serde_derive" = [ "dep:serde_derive" ];
"unstable" = [ "generate" ]; "unstable" = [ "generate" ];
}; };
resolvedDefaultFeatures = [ "default" "graphmap" "matrix_graph" "stable_graph" ];
}; };
"pin-project" = rec { "pin-project" = rec {
crateName = "pin-project"; crateName = "pin-project";
@ -13814,6 +13815,10 @@ rec {
name = "parking_lot"; name = "parking_lot";
packageId = "parking_lot 0.12.1"; packageId = "parking_lot 0.12.1";
} }
{
name = "petgraph";
packageId = "petgraph";
}
{ {
name = "pin-project-lite"; name = "pin-project-lite";
packageId = "pin-project-lite"; packageId = "pin-project-lite";

View file

@ -32,6 +32,7 @@ zstd = "0.13.0"
serde = { version = "1.0.197", features = [ "derive" ] } serde = { version = "1.0.197", features = [ "derive" ] }
serde_with = "3.7.0" serde_with = "3.7.0"
serde_qs = "0.12.0" serde_qs = "0.12.0"
petgraph = "0.6.4"
[dependencies.bigtable_rs] [dependencies.bigtable_rs]
optional = true optional = true

View file

@ -2,7 +2,11 @@ use std::collections::{HashMap, HashSet};
use bstr::ByteSlice; use bstr::ByteSlice;
use tracing::{instrument, trace}; use petgraph::{
graph::{DiGraph, NodeIndex},
visit::Bfs,
};
use tracing::instrument;
use crate::{ use crate::{
proto::{self, Directory}, proto::{self, Directory},
@ -13,14 +17,16 @@ use crate::{
/// Directories), and their insertion order. /// Directories), and their insertion order.
/// ///
/// Directories need to be inserted (via `add`), in an order from the leaves to /// Directories need to be inserted (via `add`), in an order from the leaves to
/// the root. During insertion, We validate as much as we can at that time: /// the root (DFS Post-Order).
/// During insertion, We validate as much as we can at that time:
/// ///
/// - individual validation of Directory messages /// - individual validation of Directory messages
/// - validation of insertion order (no upload of not-yet-known Directories) /// - validation of insertion order (no upload of not-yet-known Directories)
/// - validation of size fields of referred Directories /// - validation of size fields of referred Directories
/// ///
/// Internally it keeps all received Directories (and their sizes) in a HashMap, /// Internally it keeps all received Directories in a directed graph,
/// keyed by digest. /// with node weights being the Directories and edges pointing to child
/// directories.
/// ///
/// Once all Directories have been inserted, a finalize function can be /// Once all Directories have been inserted, a finalize function can be
/// called to get a (deduplicated and) validated list of directories, in /// called to get a (deduplicated and) validated list of directories, in
@ -29,23 +35,29 @@ use crate::{
/// there's no disconnected components, and only one root. /// there's no disconnected components, and only one root.
#[derive(Default)] #[derive(Default)]
pub struct ClosureValidator { pub struct ClosureValidator {
directories_and_sizes: HashMap<B3Digest, (Directory, u64)>, // A directed graph, using Directory as node weight, without edge weights.
// Edges point from parents to children.
graph: DiGraph<Directory, ()>,
/// Keeps track of the last-inserted directory digest. Used to start the // A lookup table from directory digest to node index.
/// connectivity check. digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
last_directory_digest: Option<B3Digest>,
/// Keeps track of the last-inserted directory graph node index.
/// On a correct insert, this will be the root node, from which the DFS post
/// order traversal will start from.
last_directory_ix: Option<NodeIndex>,
} }
impl ClosureValidator { impl ClosureValidator {
/// Insert a new Directory into the closure. /// Insert a new Directory into the closure.
/// Perform individual Directory validation, validation of insertion order /// Perform individual Directory validation, validation of insertion order
/// and size fields. /// and size fields.
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
let digest = directory.digest(); let digest = directory.digest();
if self.directories_and_sizes.contains_key(&digest) { // If we already saw this node previously, it's already validated and in the graph.
trace!("already seen, skipping"); if self.digest_to_node_ix.contains_key(&digest) {
return Ok(()); return Ok(());
} }
@ -55,35 +67,51 @@ impl ClosureValidator {
.map_err(|e| Error::InvalidRequest(e.to_string()))?; .map_err(|e| Error::InvalidRequest(e.to_string()))?;
// Ensure the directory only refers to directories which we already accepted. // Ensure the directory only refers to directories which we already accepted.
// We lookup their node indices and add them to a HashSet.
let mut child_ixs = HashSet::new();
for dir in &directory.directories { for dir in &directory.directories {
let dir_dgst = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated
// Ensure the digest has already been seen // Ensure the digest has already been seen
let (_, recorded_dir_size) = let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| {
self.directories_and_sizes.get(&dir_dgst).ok_or_else(|| {
Error::InvalidRequest(format!( Error::InvalidRequest(format!(
"'{}' refers to unseen child dir: {}", "'{}' refers to unseen child dir: {}",
dir.name.as_bstr(), dir.name.as_bstr(),
dir_dgst &child_digest
)) ))
})?; })?;
// Ensure the size specified in the child node matches the directory size itself.
let recorded_child_size = self
.graph
.node_weight(child_ix)
.expect("node not found")
.size();
// Ensure the size specified in the child node matches our records. // Ensure the size specified in the child node matches our records.
if dir.size != *recorded_dir_size { if dir.size != recorded_child_size {
return Err(Error::InvalidRequest(format!( return Err(Error::InvalidRequest(format!(
"'{}' has wrong size, specified {}, recorded {}", "'{}' has wrong size, specified {}, recorded {}",
dir.name.as_bstr(), dir.name.as_bstr(),
dir.size, dir.size,
recorded_dir_size recorded_child_size
))); )));
} }
child_ixs.insert(child_ix);
} }
trace!("inserting"); // Insert node into the graph, and add edges to all children.
let directory_size = directory.size(); let node_ix = self.graph.add_node(directory);
self.directories_and_sizes for child_ix in child_ixs {
.insert(digest.clone(), (directory, directory_size)); self.graph.add_edge(node_ix, child_ix, ());
self.last_directory_digest = Some(digest); }
// Record the mapping from digest to node_ix in our lookup table.
self.digest_to_node_ix.insert(digest, node_ix);
// Update last_directory_ix.
self.last_directory_ix = Some(node_ix);
Ok(()) Ok(())
} }
@ -93,77 +121,61 @@ impl ClosureValidator {
/// order. /// order.
/// In case no elements have been inserted, returns an empty list. /// In case no elements have been inserted, returns an empty list.
#[instrument(level = "trace", skip_all, err)] #[instrument(level = "trace", skip_all, err)]
pub(crate) fn finalize(mut self) -> Result<Vec<Directory>, Error> { pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> {
if self.last_directory_digest.is_none() { // If no nodes were inserted, an empty list is returned.
return Ok(vec![]); let last_directory_ix = if let Some(x) = self.last_directory_ix {
} x
let root_digest = self.last_directory_digest.unwrap();
// recursively walk all directories reachable from there,
// ensure we visited all nodes that were uploaded.
// If not, we might have received multiple disconnected graphs.
// The list of directories we know we need to visit.
// Once we're finished working, and (there's no errors), this in reversed order will
// be a valid insertion order, and directories_and_sizes will be empty.
let mut dirs_to_visit = Vec::with_capacity(self.directories_and_sizes.len());
dirs_to_visit.push(
self.directories_and_sizes
.remove(&root_digest)
.expect("root digest not found")
.0,
);
// The set of digests seen while going visiting all directories.
let mut seen_dir_digests = HashSet::new();
// This loop moves gradually to the end of `dirs_to_visit`, while it's being
// extended.
// The loop stops once it reaches the end.
let mut i = 0;
while let Some(directory) = dirs_to_visit.get(i).map(|d| d.to_owned()) {
// lookup all child directories and put them in the back of dirs_to_visit,
// if they're not already there.
for child_dir in &directory.directories {
let child_digest = B3Digest::try_from(child_dir.digest.to_owned()).unwrap(); // validated
// In case the digest is neither already visited nor already in dirs_to_visit,
// add it to the end of it.
// We don't need to check for the hash we're currently
// visiting, as we can't self-reference.
if !seen_dir_digests.contains(&child_digest) {
dirs_to_visit.push(
self.directories_and_sizes
.remove(&child_digest)
.expect("child digest not found")
.0,
);
seen_dir_digests.insert(child_digest);
}
}
i += 1;
}
// check directories_and_sizes is empty.
if !self.directories_and_sizes.is_empty() {
if cfg!(debug_assertions) {
return Err(Error::InvalidRequest(format!(
"found {} disconnected nodes: {:?}",
self.directories_and_sizes.len(),
self.directories_and_sizes
)));
} else { } else {
return Ok(vec![]);
};
// do a BFS traversal of the graph, starting with the root node to get
// (the count of) all nodes reachable from there.
let mut traversal = Bfs::new(&self.graph, last_directory_ix);
let mut visited_directory_count = 0;
#[cfg(debug_assertions)]
let mut visited_directory_ixs = HashSet::new();
while let Some(directory_ix) = traversal.next(&self.graph) {
#[cfg(debug_assertions)]
visited_directory_ixs.insert(directory_ix);
visited_directory_count += 1;
}
// If the number of nodes collected equals the total number of nodes in
// the graph, we know all nodes are connected.
if visited_directory_count != self.graph.node_count() {
// more or less exhaustive error reporting.
#[cfg(debug_assertions)]
{
let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect();
let unvisited_directories: HashSet<_> = all_directory_ixs
.difference(&visited_directory_ixs)
.map(|ix| self.graph.node_weight(*ix).expect("node not found"))
.collect();
return Err(Error::InvalidRequest(format!( return Err(Error::InvalidRequest(format!(
"found {} disconnected nodes", "found {} disconnected directories: {:?}",
self.directories_and_sizes.len() self.graph.node_count() - visited_directory_ixs.len(),
unvisited_directories
)));
}
#[cfg(not(debug_assertions))]
{
return Err(Error::InvalidRequest(format!(
"found {} disconnected directories",
self.graph.node_count() - visited_directory_count
))); )));
} }
} }
// Reverse to have correct insertion order. // Dissolve the graph, returning the nodes as a Vec.
dirs_to_visit.reverse(); // As the graph was populated in a valid DFS PostOrder, we can return
// nodes in that same order.
Ok(dirs_to_visit) let (nodes, _edges) = self.graph.into_nodes_edges();
Ok(nodes.into_iter().map(|x| x.weight).collect())
} }
} }
@ -237,13 +249,13 @@ mod tests {
} }
} }
// everything was uploaded successfully. Test drain(). // everything was uploaded successfully. Test finalize().
let resp = dcv.finalize(); let resp = dcv.finalize();
match exp_finalize { match exp_finalize {
Some(exp_drain) => { Some(directories) => {
assert_eq!( assert_eq!(
Vec::from_iter(exp_drain.iter().map(|e| (*e).to_owned())), Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
resp.expect("drain should succeed") resp.expect("drain should succeed")
); );
} }