refactor(tvix/castore): generalize store ingestion streams

Previously the store ingestion code was coupled to `walkdir::DirEntry`s
produced by the `walkdir` crate which made it impossible to reuse
ingesting from other sources like tarballs or NARs.

This introduces a `IngestionEntry` which carries enough information for
store ingestion and a future for computing the Blake3 digest of files.
This allows the producer to perform file uploads in a way that makes
sense for the source, ie. the filesystem upload could concurrently
upload multiple files at the same time, while the NAR ingestor will need
to ingest the entire blob before yielding the next blob in the stream.
In the future we can buffer small blobs and upload them concurrently,
but the full blob still needs to be read from the NAR before advancing.

Change-Id: I6d144063e2ba5b05e765bac1f27d41b3c8e7b283
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11462
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Connor Brewster 2024-04-18 13:51:28 -05:00
parent 150106610e
commit 259d7a3cfa
5 changed files with 256 additions and 258 deletions

View file

@ -6,11 +6,14 @@ use crate::proto::Directory;
use crate::proto::DirectoryNode;
use crate::proto::FileNode;
use crate::proto::SymlinkNode;
use crate::B3Digest;
use crate::Error as CastoreError;
use async_stream::stream;
use futures::pin_mut;
use futures::stream::BoxStream;
use futures::Future;
use futures::{Stream, StreamExt};
use std::fs::FileType;
use std::os::unix::fs::MetadataExt;
use std::pin::Pin;
use tracing::Level;
#[cfg(target_family = "unix")]
@ -26,9 +29,6 @@ use tracing::instrument;
use walkdir::DirEntry;
use walkdir::WalkDir;
#[cfg(debug_assertions)]
use std::collections::HashSet;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed to upload directory at {0}: {1}")]
@ -65,274 +65,253 @@ impl From<Error> for std::io::Error {
}
}
/// Walk the filesystem at a given path and returns a level-keyed list of directory entries.
/// Walk the filesystem at a given path and returns a stream of ingestion entries.
///
/// This is how [`ingest_path`] assembles the set of entries to pass on [`ingest_entries`].
/// This low-level function can be used if additional filtering or processing is required on the
/// entries.
///
/// Level here is in the context of graph theory, e.g. 2-level nodes
/// are nodes that are at depth 2.
/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
///
/// This function will walk the filesystem using `walkdir` and will consume
/// `O(#number of entries)` space.
#[instrument(fields(path), err)]
pub fn walk_path_for_ingestion<P>(path: P) -> Result<Vec<Vec<DirEntry>>, Error>
#[instrument(fields(path), skip(blob_service))]
fn walk_path_for_ingestion<'a, BS>(
blob_service: BS,
path: &'a Path,
) -> BoxStream<'a, Result<IngestionEntry<'a>, Error>>
where
P: AsRef<Path> + std::fmt::Debug,
BS: BlobService + Clone + 'a,
{
let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()];
for entry in WalkDir::new(path.as_ref())
let iter = WalkDir::new(path)
.follow_links(false)
.follow_root_links(false)
.contents_first(false)
.sort_by_file_name()
.into_iter()
{
// Entry could be a NotFound, if the root path specified does not exist.
let entry = entry.map_err(|e| {
Error::UnableToOpen(
PathBuf::from(path.as_ref()),
e.into_io_error().expect("walkdir err must be some"),
)
})?;
.contents_first(true)
.into_iter();
if entry.depth() >= entries_per_depths.len() {
debug_assert!(
entry.depth() == entries_per_depths.len(),
"Received unexpected entry with depth {} during descent, previously at {}",
entry.depth(),
entries_per_depths.len()
);
entries_per_depths.push(vec![entry]);
} else {
entries_per_depths[entry.depth()].push(entry);
}
}
Ok(entries_per_depths)
dir_entry_iter_to_ingestion_stream(blob_service, iter, path)
}
/// Convert a leveled-key vector of filesystem entries into a stream of
/// [DirEntry] in a way that honors the Merkle invariant, i.e. from bottom to top.
pub fn leveled_entries_to_stream(
entries_per_depths: Vec<Vec<DirEntry>>,
) -> impl Stream<Item = DirEntry> {
stream! {
for level in entries_per_depths.into_iter().rev() {
for entry in level.into_iter() {
yield entry;
}
}
/// Converts an iterator of [walkdir::DirEntry]s into a stream of ingestion entries.
/// This can then be fed into [ingest_entries] to ingest all the entries into the castore.
///
/// The root is the [Path] in the filesystem that is being ingested into the castore.
pub fn dir_entry_iter_to_ingestion_stream<'a, BS, I>(
blob_service: BS,
iter: I,
root: &'a Path,
) -> BoxStream<'a, Result<IngestionEntry<'a>, Error>>
where
BS: BlobService + Clone + 'a,
I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
{
let prefix = root.parent().unwrap_or_else(|| Path::new(""));
let iter = iter.map(move |entry| match entry {
Ok(entry) => dir_entry_to_ingestion_entry(blob_service.clone(), &entry, prefix),
Err(error) => Err(Error::UnableToStat(
root.to_path_buf(),
error.into_io_error().expect("walkdir err must be some"),
)),
});
Box::pin(futures::stream::iter(iter))
}
/// Converts a [walkdir::DirEntry] into an [IngestionEntry], uploading blobs to the
/// provided [BlobService].
///
/// The prefix path is stripped from the path of each entry. This is usually the parent path
/// of the path being ingested so that the last element of the stream only has one component.
fn dir_entry_to_ingestion_entry<'a, BS>(
blob_service: BS,
entry: &DirEntry,
prefix: &Path,
) -> Result<IngestionEntry<'a>, Error>
where
BS: BlobService + 'a,
{
let file_type = entry.file_type();
let path = entry
.path()
.strip_prefix(prefix)
.expect("Tvix bug: failed to strip root path prefix")
.to_path_buf();
if file_type.is_dir() {
Ok(IngestionEntry::Dir { path })
} else if file_type.is_symlink() {
let target = std::fs::read_link(entry.path())
.map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?;
Ok(IngestionEntry::Symlink { path, target })
} else if file_type.is_file() {
let metadata = entry
.metadata()
.map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
// TODO: In the future, for small files, hash right away and upload async.
let digest = Box::pin(upload_blob_at_path(
blob_service,
entry.path().to_path_buf(),
));
Ok(IngestionEntry::Regular {
path,
size: metadata.size(),
// If it's executable by the user, it'll become executable.
// This matches nix's dump() function behaviour.
executable: metadata.permissions().mode() & 64 != 0,
digest,
})
} else {
Ok(IngestionEntry::Unknown { path, file_type })
}
}
/// Uploads the file at the provided [Path] the the [BlobService].
#[instrument(skip(blob_service), fields(path), err)]
async fn upload_blob_at_path<BS>(blob_service: BS, path: PathBuf) -> Result<B3Digest, Error>
where
BS: BlobService,
{
let mut file = match tokio::fs::File::open(&path).await {
Ok(file) => file,
Err(e) => return Err(Error::UnableToRead(path, e)),
};
let mut writer = blob_service.open_write().await;
if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
return Err(Error::UnableToRead(path, e));
};
let digest = writer
.close()
.await
.map_err(|e| Error::UnableToRead(path, e))?;
Ok(digest)
}
/// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
/// [DirectoryService]. It returns the root node or an error.
///
/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
#[instrument(skip(blob_service, directory_service), fields(path), err)]
pub async fn ingest_path<'a, BS, DS, P>(
pub async fn ingest_path<BS, DS, P>(
blob_service: BS,
directory_service: DS,
path: P,
) -> Result<Node, Error>
where
P: AsRef<Path> + std::fmt::Debug,
BS: AsRef<dyn BlobService>,
BS: BlobService + Clone,
DS: AsRef<dyn DirectoryService>,
{
// produce the leveled-key vector of DirEntry.
let entries_per_depths = walk_path_for_ingestion(path)?;
let direntry_stream = leveled_entries_to_stream(entries_per_depths);
pin_mut!(direntry_stream);
ingest_entries(blob_service, directory_service, direntry_stream).await
let entry_stream = walk_path_for_ingestion(blob_service, path.as_ref());
ingest_entries(directory_service, entry_stream).await
}
/// The Merkle invariant checker is an internal structure to perform bookkeeping of all directory
/// entries we are ingesting and verifying we are ingesting them in the right order.
/// Ingests elements from the given stream of [DirEntry] into a the passed [DirectoryService].
///
/// That is, whenever we process an entry `L`, we would like to verify if we didn't process earlier
/// an entry `P` such that `P` is an **ancestor** of `L`.
/// The stream must have the following invariants:
/// - All children entries must come before their parents.
/// - The last entry must be the root node which must have a single path component.
/// - Every entry should have a unique path.
///
/// If such a thing happened, it means that we have processed something like:
/// Internally we maintain a [HashMap] of [PathBuf] to partially populated [Directory] at that
/// path. Once we receive an [IngestionEntry] for the directory itself, we remove it from the
/// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
///
///```no_trust
/// A
/// / \
/// B C
/// / \ \
/// G F P <--------- processed before this one
/// / \ |
/// D E |
/// \ |
/// L <-----------------------------+
/// ```
///
/// This is exactly what must never happen.
///
/// Note: this checker is local, it can only see what happens on our side, not on the remote side,
/// i.e. the different remote services.
#[derive(Default)]
#[cfg(debug_assertions)]
struct MerkleInvariantChecker {
seen: HashSet<PathBuf>,
}
#[cfg(debug_assertions)]
impl MerkleInvariantChecker {
/// See a directory entry and remember it.
fn see(&mut self, node: &DirEntry) {
self.seen.insert(node.path().to_owned());
}
/// Returns a potential ancestor already seen for that directory entry.
fn find_ancestor<'a>(&self, node: &'a DirEntry) -> Option<&'a Path> {
node.path().ancestors().find(|p| self.seen.contains(*p))
}
}
/// Ingests elements from the given stream of [`DirEntry`] into a the passed [`BlobService`] and
/// [`DirectoryService`].
/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
/// On success, returns the root node.
#[instrument(skip_all, ret(level = Level::TRACE), err)]
pub async fn ingest_entries<'a, BS, DS, S>(
blob_service: BS,
pub async fn ingest_entries<'a, DS, S>(
directory_service: DS,
#[allow(unused_mut)] mut direntry_stream: S,
) -> Result<Node, Error>
where
BS: AsRef<dyn BlobService>,
DS: AsRef<dyn DirectoryService>,
S: Stream<Item = DirEntry> + std::marker::Unpin,
S: Stream<Item = Result<IngestionEntry<'a>, Error>> + Send + std::marker::Unpin,
{
#[cfg(debug_assertions)]
let mut invariant_checker: MerkleInvariantChecker = Default::default();
#[cfg(debug_assertions)]
let mut direntry_stream = direntry_stream.inspect(|e| {
// If we find an ancestor before we see this entry, this means that the caller
// broke the contract, refer to the documentation of the invariant checker to
// understand the reasoning here.
if let Some(ancestor) = invariant_checker.find_ancestor(e) {
panic!(
"Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
ancestor.display(),
e.path().display()
);
}
invariant_checker.see(e);
});
// For a given path, this holds the [Directory] structs as they are populated.
let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
// We need to process a directory's children before processing
// the directory itself in order to have all the data needed
// to compute the hash.
let root_node = loop {
let entry = match direntry_stream.next().await {
Some(entry) => entry,
None => {
// The last entry of the stream must have depth 0, after which
// we break the loop manually.
panic!("Tvix bug: unexpected end of stream");
let mut entry = direntry_stream
.next()
.await
// The last entry of the stream must have 1 path component, after which
// we break the loop manually.
.expect("Tvix bug: unexpected end of stream")?;
let name = entry
.path()
.file_name()
// If this is the root node, it will have an empty name.
.unwrap_or_default()
.as_bytes()
.to_owned()
.into();
let node = match &mut entry {
IngestionEntry::Dir { .. } => {
// If the entry is a directory, we traversed all its children (and
// populated it in `directories`).
// If we don't have it in there, it's an empty directory.
let directory = directories
.remove(entry.path())
// In that case, it contained no children
.unwrap_or_default();
let directory_size = directory.size();
let directory_digest = directory.digest();
// Use the directory_putter to upload the directory.
// If we don't have one yet (as that's the first one to upload),
// initialize the putter.
maybe_directory_putter
.get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
.put(directory)
.await?;
Node::Directory(DirectoryNode {
name,
digest: directory_digest.into(),
size: directory_size,
})
}
IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode {
name,
target: target.as_os_str().as_bytes().to_owned().into(),
}),
IngestionEntry::Regular {
size,
executable,
digest,
..
} => Node::File(FileNode {
name,
digest: digest.await?.into(),
size: *size,
executable: *executable,
}),
IngestionEntry::Unknown { path, file_type } => {
return Err(Error::UnsupportedFileType(path.clone(), *file_type));
}
};
let file_type = entry.file_type();
let node = if file_type.is_dir() {
// If the entry is a directory, we traversed all its children (and
// populated it in `directories`).
// If we don't have it in there, it's an empty directory.
let directory = directories
.remove(entry.path())
// In that case, it contained no children
.unwrap_or_default();
let directory_size = directory.size();
let directory_digest = directory.digest();
// Use the directory_putter to upload the directory.
// If we don't have one yet (as that's the first one to upload),
// initialize the putter.
maybe_directory_putter
.get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
.put(directory)
.await?;
Node::Directory(DirectoryNode {
name: entry.file_name().as_bytes().to_owned().into(),
digest: directory_digest.into(),
size: directory_size,
})
} else if file_type.is_symlink() {
let target: bytes::Bytes = std::fs::read_link(entry.path())
.map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?
.as_os_str()
.as_bytes()
.to_owned()
.into();
Node::Symlink(SymlinkNode {
name: entry.file_name().as_bytes().to_owned().into(),
target,
})
} else if file_type.is_file() {
let metadata = entry
.metadata()
.map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
let mut file = tokio::fs::File::open(entry.path())
.await
.map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?;
let mut writer = blob_service.as_ref().open_write().await;
if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
return Err(Error::UnableToRead(entry.path().to_path_buf(), e));
};
let digest = writer
.close()
.await
.map_err(|e| Error::UnableToRead(entry.path().to_path_buf(), e))?;
Node::File(FileNode {
name: entry.file_name().as_bytes().to_vec().into(),
digest: digest.into(),
size: metadata.len(),
// If it's executable by the user, it'll become executable.
// This matches nix's dump() function behaviour.
executable: metadata.permissions().mode() & 64 != 0,
})
} else {
return Err(Error::UnsupportedFileType(
entry.path().to_path_buf(),
file_type,
));
};
if entry.depth() == 0 {
if entry.path().components().count() == 1 {
break node;
} else {
// calculate the parent path, and make sure we register the node there.
// NOTE: entry.depth() > 0
let parent_path = entry.path().parent().unwrap().to_path_buf();
// record node in parent directory, creating a new [proto:Directory] if not there yet.
let parent_directory = directories.entry(parent_path).or_default();
match node {
Node::Directory(e) => parent_directory.directories.push(e),
Node::File(e) => parent_directory.files.push(e),
Node::Symlink(e) => parent_directory.symlinks.push(e),
}
}
// record node in parent directory, creating a new [Directory] if not there yet.
directories
.entry(entry.path().parent().unwrap().to_path_buf())
.or_default()
.add(node);
};
// if there were directories uploaded, make sure we flush the putter, so
@ -359,3 +338,36 @@ where
Ok(root_node)
}
type BlobFut<'a> = Pin<Box<dyn Future<Output = Result<B3Digest, Error>> + Send + 'a>>;
pub enum IngestionEntry<'a> {
Regular {
path: PathBuf,
size: u64,
executable: bool,
digest: BlobFut<'a>,
},
Symlink {
path: PathBuf,
target: PathBuf,
},
Dir {
path: PathBuf,
},
Unknown {
path: PathBuf,
file_type: FileType,
},
}
impl<'a> IngestionEntry<'a> {
fn path(&self) -> &Path {
match self {
IngestionEntry::Regular { path, .. } => path,
IngestionEntry::Symlink { path, .. } => path,
IngestionEntry::Dir { path } => path,
IngestionEntry::Unknown { path, .. } => path,
}
}
}

View file

@ -1,9 +1,7 @@
//! Implements builtins used to import paths in the store.
use crate::builtins::errors::ImportError;
use futures::pin_mut;
use std::path::Path;
use tvix_castore::import::leveled_entries_to_stream;
use tvix_eval::{
builtin_macros::builtins,
generators::{self, GenCo},
@ -18,17 +16,15 @@ async fn filtered_ingest(
path: &Path,
filter: Option<&Value>,
) -> Result<tvix_castore::proto::node::Node, ErrorKind> {
// produce the leveled-key vector of DirEntry.
let mut entries_per_depths: Vec<Vec<walkdir::DirEntry>> = vec![Vec::new()];
let mut entries: Vec<walkdir::DirEntry> = vec![];
let mut it = walkdir::WalkDir::new(path)
.follow_links(false)
.follow_root_links(false)
.contents_first(false)
.sort_by_file_name()
.into_iter();
// Skip root node.
entries_per_depths[0].push(
entries.push(
it.next()
.ok_or_else(|| ErrorKind::IO {
path: Some(path.to_path_buf()),
@ -85,28 +81,14 @@ async fn filtered_ingest(
continue;
}
if entry.depth() >= entries_per_depths.len() {
debug_assert!(
entry.depth() == entries_per_depths.len(),
"Received unexpected entry with depth {} during descent, previously at {}",
entry.depth(),
entries_per_depths.len()
);
entries_per_depths.push(vec![entry]);
} else {
entries_per_depths[entry.depth()].push(entry);
}
// FUTUREWORK: determine when it's the right moment to flush a level to the ingester.
entries.push(entry);
}
let direntry_stream = leveled_entries_to_stream(entries_per_depths);
pin_mut!(direntry_stream);
let entries_iter = entries.into_iter().rev().map(Ok);
state.tokio_handle.block_on(async {
state
.ingest_entries(direntry_stream)
.ingest_dir_entries(entries_iter, path)
.await
.map_err(|err| ErrorKind::IO {
path: Some(path.to_path_buf()),

View file

@ -2,13 +2,11 @@
use async_recursion::async_recursion;
use bytes::Bytes;
use futures::Stream;
use futures::{StreamExt, TryStreamExt};
use nix_compat::nixhash::NixHash;
use nix_compat::store_path::{build_ca_path, StorePathRef};
use nix_compat::{nixhash::CAHash, store_path::StorePath};
use sha2::{Digest, Sha256};
use std::marker::Unpin;
use std::rc::Rc;
use std::{
cell::RefCell,
@ -20,6 +18,7 @@ use std::{
use tokio_util::io::SyncIoBridge;
use tracing::{error, instrument, warn, Level};
use tvix_build::buildservice::BuildService;
use tvix_castore::import::dir_entry_iter_to_ingestion_stream;
use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
use tvix_store::utils::AsyncIoBridge;
use walkdir::DirEntry;
@ -278,17 +277,19 @@ impl TvixStoreIO {
/// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
/// passing the blob_service and directory_service that's used.
/// The error is mapped to std::io::Error for simplicity.
pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
pub(crate) async fn ingest_dir_entries<'a, I>(
&'a self,
iter: I,
root: &Path,
) -> io::Result<Node>
where
S: Stream<Item = DirEntry> + Unpin,
I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + Sync + 'a,
{
tvix_castore::import::ingest_entries(
&self.blob_service,
&self.directory_service,
entries_stream,
)
.await
.map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
let entries_stream = dir_entry_iter_to_ingestion_stream(&self.blob_service, iter, root);
tvix_castore::import::ingest_entries(&self.directory_service, entries_stream)
.await
.map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
}
pub(crate) async fn node_to_path_info(

View file

@ -5,6 +5,7 @@ use futures::future::try_join_all;
use nix_compat::path_info::ExportedPathInfo;
use serde::Deserialize;
use serde::Serialize;
use std::os::unix::ffi::OsStrExt;
use std::path::PathBuf;
use std::sync::Arc;
use tokio_listener::Listener;
@ -430,9 +431,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let path: PathBuf = elem.path.to_absolute_path().into();
let root_name = path.file_name().unwrap().as_bytes().to_vec().into();
// Ingest the given path
let root_node =
ingest_path(blob_service.clone(), directory_service.clone(), path).await?;
let root_node = ingest_path(blob_service.clone(), directory_service.clone(), &path)
.await?
.rename(root_name);
// Create and upload a PathInfo pointing to the root_node,
// annotated with information we have from the reference graph.

View file

@ -112,12 +112,12 @@ pub async fn import_path_as_nar_ca<BS, DS, PS, P>(
) -> Result<StorePath, std::io::Error>
where
P: AsRef<Path> + std::fmt::Debug,
BS: AsRef<dyn BlobService> + Clone,
BS: BlobService + Clone,
DS: AsRef<dyn DirectoryService>,
PS: AsRef<dyn PathInfoService>,
{
let root_node =
tvix_castore::import::ingest_path(blob_service, directory_service, &path).await?;
tvix_castore::import::ingest_path(blob_service, directory_service, path.as_ref()).await?;
// Ask the PathInfoService for the NAR size and sha256
let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?;