refactor(tvix/castore): break down ingest_path
In one function that does the heavy lifting: `ingest_entries`, and three additional helpers: - `walk_path_for_ingestion` which perform the tree walking in a very naive way and can be replaced by the user - `leveled_entries_to_stream` which transforms a list of a list of entries ordered by their depth in the tree to a stream of entries in the bottom to top order (Merkle-compatible order I will say in the future). - `ingest_path` which calls the previous functions. Change-Id: I724b972d3c5bffc033f03363255eae448f017cef Reviewed-on: https://cl.tvl.fyi/c/depot/+/10573 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de> Autosubmit: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
1f1a42b4da
commit
7275288f0e
1 changed files with 185 additions and 68 deletions
|
@ -7,6 +7,9 @@ use crate::proto::DirectoryNode;
|
|||
use crate::proto::FileNode;
|
||||
use crate::proto::SymlinkNode;
|
||||
use crate::Error as CastoreError;
|
||||
use async_stream::stream;
|
||||
use futures::pin_mut;
|
||||
use futures::Stream;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
|
@ -14,10 +17,14 @@ use std::{
|
|||
os::unix::prelude::PermissionsExt,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::instrument;
|
||||
use walkdir::DirEntry;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
use std::collections::HashSet;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("failed to upload directory at {0}: {1}")]
|
||||
|
@ -141,34 +148,24 @@ where
|
|||
todo!("handle other types")
|
||||
}
|
||||
|
||||
/// Ingests the contents at the given path into the tvix store,
|
||||
/// interacting with a [BlobService] and [DirectoryService].
|
||||
/// It returns the root node or an error.
|
||||
/// Walk the filesystem at a given path and returns a level-keyed list of directory entries.
|
||||
///
|
||||
/// It does not follow symlinks at the root, they will be ingested as actual
|
||||
/// symlinks.
|
||||
/// This is how [`ingest_path`] assembles the set of entries to pass on [`ingest_entries`].
|
||||
/// This low-level function can be used if additional filtering or processing is required on the
|
||||
/// entries.
|
||||
///
|
||||
/// It's not interacting with a PathInfoService (from tvix-store), or anything
|
||||
/// else giving it a "non-content-addressed name".
|
||||
/// It's up to the caller to possibly register it somewhere (and potentially
|
||||
/// rename it based on some naming scheme)
|
||||
#[instrument(skip(blob_service, directory_service), fields(path=?p), err)]
|
||||
pub async fn ingest_path<'a, BS, DS, P>(
|
||||
blob_service: BS,
|
||||
directory_service: DS,
|
||||
p: P,
|
||||
) -> Result<Node, Error>
|
||||
/// Level here is in the context of graph theory, e.g. 2-level nodes
|
||||
/// are nodes that are at depth 2.
|
||||
///
|
||||
/// This function will walk the filesystem using `walkdir` and will consume
|
||||
/// `O(#number of entries)` space.
|
||||
#[instrument(fields(path), err)]
|
||||
pub fn walk_path_for_ingestion<P>(path: P) -> Result<Vec<Vec<DirEntry>>, Error>
|
||||
where
|
||||
P: AsRef<Path> + Debug,
|
||||
BS: AsRef<dyn BlobService> + Clone,
|
||||
DS: AsRef<dyn DirectoryService>,
|
||||
P: AsRef<Path> + std::fmt::Debug,
|
||||
{
|
||||
let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
|
||||
|
||||
let mut directory_putter = directory_service.as_ref().put_multiple_start();
|
||||
|
||||
let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()];
|
||||
for entry in WalkDir::new(p.as_ref())
|
||||
for entry in WalkDir::new(path.as_ref())
|
||||
.follow_links(false)
|
||||
.follow_root_links(false)
|
||||
.contents_first(false)
|
||||
|
@ -178,7 +175,7 @@ where
|
|||
// Entry could be a NotFound, if the root path specified does not exist.
|
||||
let entry = entry.map_err(|e| {
|
||||
Error::UnableToOpen(
|
||||
PathBuf::from(p.as_ref()),
|
||||
PathBuf::from(path.as_ref()),
|
||||
e.into_io_error().expect("walkdir err must be some"),
|
||||
)
|
||||
})?;
|
||||
|
@ -197,59 +194,179 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
debug_assert!(!entries_per_depths[0].is_empty(), "No root node available!");
|
||||
Ok(entries_per_depths)
|
||||
}
|
||||
|
||||
/// Convert a leveled-key vector of filesystem entries into a stream of
|
||||
/// [DirEntry] in a way that honors the Merkle invariant, i.e. from bottom to top.
|
||||
pub fn leveled_entries_to_stream(
|
||||
entries_per_depths: Vec<Vec<DirEntry>>,
|
||||
) -> impl Stream<Item = DirEntry> {
|
||||
stream! {
|
||||
for level in entries_per_depths.into_iter().rev() {
|
||||
for entry in level.into_iter() {
|
||||
yield entry;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
|
||||
/// [DirectoryService]. It returns the root node or an error.
|
||||
///
|
||||
/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
|
||||
#[instrument(skip(blob_service, directory_service), fields(path), err)]
|
||||
pub async fn ingest_path<'a, BS, DS, P>(
|
||||
blob_service: BS,
|
||||
directory_service: DS,
|
||||
path: P,
|
||||
) -> Result<Node, Error>
|
||||
where
|
||||
P: AsRef<Path> + std::fmt::Debug,
|
||||
BS: AsRef<dyn BlobService> + Clone,
|
||||
DS: AsRef<dyn DirectoryService>,
|
||||
{
|
||||
let entries = walk_path_for_ingestion(path)?;
|
||||
let entries_stream = leveled_entries_to_stream(entries);
|
||||
pin_mut!(entries_stream);
|
||||
|
||||
ingest_entries(blob_service, directory_service, entries_stream).await
|
||||
}
|
||||
|
||||
/// The Merkle invariant checker is an internal structure to perform bookkeeping of all directory
|
||||
/// entries we are ingesting and verifying we are ingesting them in the right order.
|
||||
///
|
||||
/// That is, whenever we process an entry `L`, we would like to verify if we didn't process earlier
|
||||
/// an entry `P` such that `P` is an **ancestor** of `L`.
|
||||
///
|
||||
/// If such a thing happened, it means that we have processed something like:
|
||||
///
|
||||
///```no_trust
|
||||
/// A
|
||||
/// / \
|
||||
/// B C
|
||||
/// / \ \
|
||||
/// G F P <--------- processed before this one
|
||||
/// / \ |
|
||||
/// D E |
|
||||
/// \ |
|
||||
/// L <-----------------------------+
|
||||
/// ```
|
||||
///
|
||||
/// This is exactly what must never happen.
|
||||
///
|
||||
/// Note: this checker is local, it can only see what happens on our side, not on the remote side,
|
||||
/// i.e. the different remote services.
|
||||
#[derive(Default)]
|
||||
#[cfg(debug_assertions)]
|
||||
struct MerkleInvariantChecker {
|
||||
seen: HashSet<PathBuf>,
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
impl MerkleInvariantChecker {
|
||||
/// See a directory entry and remember it.
|
||||
fn see(&mut self, node: &DirEntry) {
|
||||
self.seen.insert(node.path().to_owned());
|
||||
}
|
||||
|
||||
/// Returns a potential ancestor already seen for that directory entry.
|
||||
fn find_ancestor(&self, node: &DirEntry) -> Option<PathBuf> {
|
||||
for anc in node.path().ancestors() {
|
||||
if self.seen.contains(anc) {
|
||||
return Some(anc.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Ingests elements from the given stream of [`DirEntry`] into a the passed [`BlobService`] and
|
||||
/// [`DirectoryService`].
|
||||
/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
|
||||
#[instrument(skip_all, ret, err)]
|
||||
pub async fn ingest_entries<'a, BS, DS, S>(
|
||||
blob_service: BS,
|
||||
directory_service: DS,
|
||||
mut entries_async_iterator: S,
|
||||
) -> Result<Node, Error>
|
||||
where
|
||||
BS: AsRef<dyn BlobService> + Clone,
|
||||
DS: AsRef<dyn DirectoryService>,
|
||||
S: Stream<Item = DirEntry> + std::marker::Unpin,
|
||||
{
|
||||
let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
|
||||
|
||||
let mut directory_putter = directory_service.as_ref().put_multiple_start();
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
let mut invariant_checker: MerkleInvariantChecker = Default::default();
|
||||
|
||||
// We need to process a directory's children before processing
|
||||
// the directory itself in order to have all the data needed
|
||||
// to compute the hash.
|
||||
for level in entries_per_depths.into_iter().rev() {
|
||||
for entry in level.into_iter() {
|
||||
// FUTUREWORK: inline `process_entry`
|
||||
let node = process_entry(
|
||||
blob_service.clone(),
|
||||
&mut directory_putter,
|
||||
&entry,
|
||||
// process_entry wants an Option<Directory> in case the entry points to a directory.
|
||||
// make sure to provide it.
|
||||
// If the directory has contents, we already have it in
|
||||
// `directories` because we iterate over depth in reverse order (deepest to
|
||||
// shallowest).
|
||||
if entry.file_type().is_dir() {
|
||||
Some(
|
||||
directories
|
||||
.remove(entry.path())
|
||||
// In that case, it contained no children
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
while let Some(entry) = entries_async_iterator.next().await {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
// If we find an ancestor before we see this entry, this means that the caller
|
||||
// broke the contract, refer to the documentation of the invariant checker to
|
||||
// understand the reasoning here.
|
||||
if let Some(ancestor) = invariant_checker.find_ancestor(&entry) {
|
||||
panic!("Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
|
||||
ancestor.display(),
|
||||
entry.path().display()
|
||||
);
|
||||
}
|
||||
|
||||
if entry.depth() == 0 {
|
||||
// Make sure all the directories are flushed.
|
||||
// FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal
|
||||
// to `directories.get(entry.path())`.
|
||||
if entry.file_type().is_dir() {
|
||||
directory_putter.close().await?;
|
||||
}
|
||||
return Ok(node);
|
||||
invariant_checker.see(&entry);
|
||||
}
|
||||
|
||||
// FUTUREWORK: handle directory putting here, rather than piping it through process_entry.
|
||||
let node = process_entry(
|
||||
blob_service.clone(),
|
||||
&mut directory_putter,
|
||||
&entry,
|
||||
// process_entry wants an Option<Directory> in case the entry points to a directory.
|
||||
// make sure to provide it.
|
||||
// If the directory has contents, we already have it in
|
||||
// `directories` because we iterate over depth in reverse order (deepest to
|
||||
// shallowest).
|
||||
if entry.file_type().is_dir() {
|
||||
Some(
|
||||
directories
|
||||
.remove(entry.path())
|
||||
// In that case, it contained no children
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
} else {
|
||||
// calculate the parent path, and make sure we register the node there.
|
||||
// NOTE: entry.depth() > 0
|
||||
let parent_path = entry.path().parent().unwrap().to_path_buf();
|
||||
None
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// record node in parent directory, creating a new [proto:Directory] if not there yet.
|
||||
let parent_directory = directories.entry(parent_path).or_default();
|
||||
match node {
|
||||
Node::Directory(e) => parent_directory.directories.push(e),
|
||||
Node::File(e) => parent_directory.files.push(e),
|
||||
Node::Symlink(e) => parent_directory.symlinks.push(e),
|
||||
}
|
||||
if entry.depth() == 0 {
|
||||
// Make sure all the directories are flushed.
|
||||
// FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal
|
||||
// to `directories.get(entry.path())`.
|
||||
if entry.file_type().is_dir() {
|
||||
directory_putter.close().await?;
|
||||
}
|
||||
return Ok(node);
|
||||
} else {
|
||||
// calculate the parent path, and make sure we register the node there.
|
||||
// NOTE: entry.depth() > 0
|
||||
let parent_path = entry.path().parent().unwrap().to_path_buf();
|
||||
|
||||
// record node in parent directory, creating a new [proto:Directory] if not there yet.
|
||||
let parent_directory = directories.entry(parent_path).or_default();
|
||||
match node {
|
||||
Node::Directory(e) => parent_directory.directories.push(e),
|
||||
Node::File(e) => parent_directory.files.push(e),
|
||||
Node::Symlink(e) => parent_directory.symlinks.push(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
// unreachable, we already bailed out before if root doesn't exist.
|
||||
unreachable!()
|
||||
unreachable!("Tvix bug: no root node emitted during ingestion")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue