feat(tvix/castore): ingestion does DFS and invert it
To make use of the filtering feature, we need to revert the internal walker to a real DFS. We will therefore just invert the whole tree by storing all of its contents in a level-keyed vector. This is horribly expensive in memory, this is a compromise between CPU and memory, here is the fundamental reason for why: When you encounter a directory, it's either a leaf or not, i.e. it contains subdirectories or not. To know this fact, you can: - wait until you notice subdirectories under it, i.e. you need to store any intermediate nodes you see in the meantime -> memory penalty. - getdents or readdir on it to determine *NOW* its subdirectories -> CPU penalty and I/O penalty. This is an implementation of the first proposal, we pay memory. In practice, we are paying O(#nb of nodes) in memory. There's a smarter albeit much more complicated algorithm that pays only O(\sum_i #siblings(p_i)) nodes where (p_1, ..., p_n) is the path to a leaf. which means for: A / \ B C / / \ D E F We would never store D, E, F but only E, F at a given time. But we would still store B, C no matter what. Change-Id: I456ed1c3f0db493e018ba1182665d84bebe29c11 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10567 Tested-by: BuildkiteCI Autosubmit: raitobezarius <tvl@lahfa.xyz> Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
parent
e98ea31bbd
commit
1f1a42b4da
1 changed files with 68 additions and 48 deletions
|
@ -15,6 +15,7 @@ use std::{
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
};
|
};
|
||||||
use tracing::instrument;
|
use tracing::instrument;
|
||||||
|
use walkdir::DirEntry;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
@ -58,11 +59,9 @@ impl From<Error> for std::io::Error {
|
||||||
/// For this to work, it relies on the caller to provide the directory object
|
/// For this to work, it relies on the caller to provide the directory object
|
||||||
/// with the previously returned (child) nodes.
|
/// with the previously returned (child) nodes.
|
||||||
///
|
///
|
||||||
/// It assumes entries to be returned in "contents first" order, means this
|
/// It assumes to be called only if all children of it have already been processed. If the entry is
|
||||||
/// will only be called with a directory if all children of it have been
|
/// indeed a directory, it'll also upload that directory to the store. For this, the
|
||||||
/// visited. If the entry is indeed a directory, it'll also upload that
|
/// so-far-assembled Directory object for this path needs to be passed in.
|
||||||
/// directory to the store. For this, the so-far-assembled Directory object for
|
|
||||||
/// this path needs to be passed in.
|
|
||||||
///
|
///
|
||||||
/// It assumes the caller adds returned nodes to the directories it assembles.
|
/// It assumes the caller adds returned nodes to the directories it assembles.
|
||||||
#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
|
#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
|
||||||
|
@ -168,14 +167,13 @@ where
|
||||||
|
|
||||||
let mut directory_putter = directory_service.as_ref().put_multiple_start();
|
let mut directory_putter = directory_service.as_ref().put_multiple_start();
|
||||||
|
|
||||||
|
let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()];
|
||||||
for entry in WalkDir::new(p.as_ref())
|
for entry in WalkDir::new(p.as_ref())
|
||||||
.follow_links(false)
|
.follow_links(false)
|
||||||
.follow_root_links(false)
|
.follow_root_links(false)
|
||||||
// We need to process a directory's children before processing
|
.contents_first(false)
|
||||||
// the directory itself in order to have all the data needed
|
|
||||||
// to compute the hash.
|
|
||||||
.contents_first(true)
|
|
||||||
.sort_by_file_name()
|
.sort_by_file_name()
|
||||||
|
.into_iter()
|
||||||
{
|
{
|
||||||
// Entry could be a NotFound, if the root path specified does not exist.
|
// Entry could be a NotFound, if the root path specified does not exist.
|
||||||
let entry = entry.map_err(|e| {
|
let entry = entry.map_err(|e| {
|
||||||
|
@ -185,48 +183,70 @@ where
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// process_entry wants an Option<Directory> in case the entry points to a directory.
|
if entry.depth() >= entries_per_depths.len() {
|
||||||
// make sure to provide it.
|
debug_assert!(
|
||||||
// If the directory has contents, we already have it in
|
entry.depth() == entries_per_depths.len(),
|
||||||
// `directories` due to the use of contents_first on WalkDir.
|
"Received unexpected entry with depth {} during descent, previously at {}",
|
||||||
let maybe_directory: Option<Directory> = {
|
entry.depth(),
|
||||||
if entry.file_type().is_dir() {
|
entries_per_depths.len()
|
||||||
Some(
|
);
|
||||||
directories
|
|
||||||
.entry(entry.path().to_path_buf())
|
|
||||||
.or_default()
|
|
||||||
.clone(),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let node = process_entry(
|
entries_per_depths.push(vec![entry]);
|
||||||
blob_service.clone(),
|
|
||||||
&mut directory_putter,
|
|
||||||
&entry,
|
|
||||||
maybe_directory,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if entry.depth() == 0 {
|
|
||||||
// Make sure all the directories are flushed.
|
|
||||||
if entry.file_type().is_dir() {
|
|
||||||
directory_putter.close().await?;
|
|
||||||
}
|
|
||||||
return Ok(node);
|
|
||||||
} else {
|
} else {
|
||||||
// calculate the parent path, and make sure we register the node there.
|
entries_per_depths[entry.depth()].push(entry);
|
||||||
// NOTE: entry.depth() > 0
|
}
|
||||||
let parent_path = entry.path().parent().unwrap().to_path_buf();
|
}
|
||||||
|
|
||||||
// record node in parent directory, creating a new [proto:Directory] if not there yet.
|
debug_assert!(!entries_per_depths[0].is_empty(), "No root node available!");
|
||||||
let parent_directory = directories.entry(parent_path).or_default();
|
|
||||||
match node {
|
// We need to process a directory's children before processing
|
||||||
Node::Directory(e) => parent_directory.directories.push(e),
|
// the directory itself in order to have all the data needed
|
||||||
Node::File(e) => parent_directory.files.push(e),
|
// to compute the hash.
|
||||||
Node::Symlink(e) => parent_directory.symlinks.push(e),
|
for level in entries_per_depths.into_iter().rev() {
|
||||||
|
for entry in level.into_iter() {
|
||||||
|
// FUTUREWORK: inline `process_entry`
|
||||||
|
let node = process_entry(
|
||||||
|
blob_service.clone(),
|
||||||
|
&mut directory_putter,
|
||||||
|
&entry,
|
||||||
|
// process_entry wants an Option<Directory> in case the entry points to a directory.
|
||||||
|
// make sure to provide it.
|
||||||
|
// If the directory has contents, we already have it in
|
||||||
|
// `directories` because we iterate over depth in reverse order (deepest to
|
||||||
|
// shallowest).
|
||||||
|
if entry.file_type().is_dir() {
|
||||||
|
Some(
|
||||||
|
directories
|
||||||
|
.remove(entry.path())
|
||||||
|
// In that case, it contained no children
|
||||||
|
.unwrap_or_default(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if entry.depth() == 0 {
|
||||||
|
// Make sure all the directories are flushed.
|
||||||
|
// FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal
|
||||||
|
// to `directories.get(entry.path())`.
|
||||||
|
if entry.file_type().is_dir() {
|
||||||
|
directory_putter.close().await?;
|
||||||
|
}
|
||||||
|
return Ok(node);
|
||||||
|
} else {
|
||||||
|
// calculate the parent path, and make sure we register the node there.
|
||||||
|
// NOTE: entry.depth() > 0
|
||||||
|
let parent_path = entry.path().parent().unwrap().to_path_buf();
|
||||||
|
|
||||||
|
// record node in parent directory, creating a new [proto:Directory] if not there yet.
|
||||||
|
let parent_directory = directories.entry(parent_path).or_default();
|
||||||
|
match node {
|
||||||
|
Node::Directory(e) => parent_directory.directories.push(e),
|
||||||
|
Node::File(e) => parent_directory.files.push(e),
|
||||||
|
Node::Symlink(e) => parent_directory.symlinks.push(e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue