refactor(tvix/castore/import): restructure directory uploader a bit

Have a Option<Box<dyn DirectoryPutter>>, which is lazily initialized
whenever we first want to upload a directory.

Have the loop explicitly break when it encounters the root_node, and
deal with the flushing after the loop.

Deal with the FUTUREWORK (assertion for root directory digest matching
what the DirectoryPutter returns).

Change-Id: Iefc4904d8b8387e868fb752d40e3e4e4218c7407
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11417
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2024-04-13 19:57:34 +03:00 committed by clbot
parent c088123d4e
commit 6ebaa7b88a

View file

@ -1,4 +1,5 @@
use crate::blobservice::BlobService; use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryPutter;
use crate::directoryservice::DirectoryService; use crate::directoryservice::DirectoryService;
use crate::proto::node::Node; use crate::proto::node::Node;
use crate::proto::Directory; use crate::proto::Directory;
@ -207,10 +208,6 @@ where
DS: AsRef<dyn DirectoryService>, DS: AsRef<dyn DirectoryService>,
S: Stream<Item = DirEntry> + std::marker::Unpin, S: Stream<Item = DirEntry> + std::marker::Unpin,
{ {
let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
let mut directory_putter = directory_service.as_ref().put_multiple_start();
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
let mut invariant_checker: MerkleInvariantChecker = Default::default(); let mut invariant_checker: MerkleInvariantChecker = Default::default();
@ -230,14 +227,29 @@ where
invariant_checker.see(e); invariant_checker.see(e);
}); });
// For a given path, this holds the [Directory] structs as they are populated.
let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
// We need to process a directory's children before processing // We need to process a directory's children before processing
// the directory itself in order to have all the data needed // the directory itself in order to have all the data needed
// to compute the hash. // to compute the hash.
while let Some(entry) = direntry_stream.next().await {
let root_node = loop {
let entry = match direntry_stream.next().await {
Some(entry) => entry,
None => {
// The last entry of the stream must have depth 0, after which
// we break the loop manually.
panic!("Tvix bug: unexpected end of stream");
}
};
let file_type = entry.file_type(); let file_type = entry.file_type();
let node = if file_type.is_dir() { let node = if file_type.is_dir() {
// if the entry was a directory, use the directory_putter to upload the Directory. // If the entry is a directory, we traversed all its children (and
// populated it in `directories`).
// If we don't have it in there, it's an empty directory.
let directory = directories let directory = directories
.remove(entry.path()) .remove(entry.path())
// In that case, it contained no children // In that case, it contained no children
@ -246,7 +258,13 @@ where
let directory_size = directory.size(); let directory_size = directory.size();
let directory_digest = directory.digest(); let directory_digest = directory.digest();
directory_putter.put(directory).await?; // Use the directory_putter to upload the directory.
// If we don't have one yet (as that's the first one to upload),
// initialize the putter.
maybe_directory_putter
.get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
.put(directory)
.await?;
Node::Directory(DirectoryNode { Node::Directory(DirectoryNode {
name: entry.file_name().as_bytes().to_owned().into(), name: entry.file_name().as_bytes().to_owned().into(),
@ -301,13 +319,7 @@ where
}; };
if entry.depth() == 0 { if entry.depth() == 0 {
// Make sure all the directories are flushed. break node;
// FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal
// to `directories.get(entry.path())`.
if entry.file_type().is_dir() {
directory_putter.close().await?;
}
return Ok(node);
} else { } else {
// calculate the parent path, and make sure we register the node there. // calculate the parent path, and make sure we register the node there.
// NOTE: entry.depth() > 0 // NOTE: entry.depth() > 0
@ -321,7 +333,29 @@ where
Node::Symlink(e) => parent_directory.symlinks.push(e), Node::Symlink(e) => parent_directory.symlinks.push(e),
} }
} }
} };
// unreachable, we already bailed out before if root doesn't exist.
unreachable!("Tvix bug: no root node emitted during ingestion") // if there were directories uploaded, make sure we flush the putter, so
// they're all persisted to the backend.
if let Some(mut directory_putter) = maybe_directory_putter {
let root_directory_digest = directory_putter.close().await?;
#[cfg(debug_assertions)]
{
if let Node::Directory(directory_node) = &root_node {
debug_assert_eq!(
root_directory_digest,
directory_node
.digest
.to_vec()
.try_into()
.expect("invalid digest len")
)
} else {
unreachable!("Tvix bug: directory putter initialized but no root directory node");
}
}
};
Ok(root_node)
} }