refactor(tvix/castore): switch to ingest_entries
for tarball ingestion
With `ingest_entries` being more generalized, we can now use it for ingesting the directory entries generated from tarballs. Change-Id: Ie1f7a915c456045762e05fcc9af45771f121eb43 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11489 Reviewed-by: flokli <flokli@flokli.de> Autosubmit: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI
This commit is contained in:
parent
49b63fceee
commit
fa69becf4d
3 changed files with 242 additions and 131 deletions
|
@ -1,21 +1,17 @@
|
|||
#[cfg(target_family = "unix")]
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use std::{collections::HashMap, path::PathBuf};
|
||||
|
||||
use petgraph::graph::{DiGraph, NodeIndex};
|
||||
use petgraph::visit::{DfsPostOrder, EdgeRef};
|
||||
use petgraph::Direction;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_tar::Archive;
|
||||
use tracing::{instrument, Level};
|
||||
use tracing::{instrument, warn, Level};
|
||||
|
||||
use crate::{
|
||||
blobservice::BlobService,
|
||||
directoryservice::{DirectoryPutter, DirectoryService},
|
||||
import::Error,
|
||||
proto::{node::Node, Directory, DirectoryNode, FileNode, SymlinkNode},
|
||||
};
|
||||
use crate::blobservice::BlobService;
|
||||
use crate::directoryservice::DirectoryService;
|
||||
use crate::import::{ingest_entries, Error, IngestionEntry};
|
||||
use crate::proto::node::Node;
|
||||
|
||||
/// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
|
||||
/// [`DirectoryService`].
|
||||
|
@ -35,23 +31,13 @@ where
|
|||
// contents and entries to meet the requires of the castore.
|
||||
|
||||
// In the first phase, collect up all the regular files and symlinks.
|
||||
let mut paths = HashMap::new();
|
||||
let mut entries = archive.entries().map_err(Error::Archive)?;
|
||||
while let Some(mut entry) = entries.try_next().await.map_err(Error::Archive)? {
|
||||
let path = entry.path().map_err(Error::Archive)?.into_owned();
|
||||
let name = path
|
||||
.file_name()
|
||||
.ok_or_else(|| {
|
||||
Error::Archive(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
"invalid filename in archive",
|
||||
))
|
||||
})?
|
||||
.as_bytes()
|
||||
.to_vec()
|
||||
.into();
|
||||
let mut nodes = IngestionEntryGraph::new();
|
||||
|
||||
let node = match entry.header().entry_type() {
|
||||
let mut entries_iter = archive.entries().map_err(Error::Archive)?;
|
||||
while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::Archive)? {
|
||||
let path: PathBuf = entry.path().map_err(Error::Archive)?.into();
|
||||
|
||||
let entry = match entry.header().entry_type() {
|
||||
tokio_tar::EntryType::Regular
|
||||
| tokio_tar::EntryType::GNUSparse
|
||||
| tokio_tar::EntryType::Continuous => {
|
||||
|
@ -62,140 +48,257 @@ where
|
|||
.await
|
||||
.map_err(Error::Archive)?;
|
||||
let digest = writer.close().await.map_err(Error::Archive)?;
|
||||
Node::File(FileNode {
|
||||
name,
|
||||
digest: digest.into(),
|
||||
|
||||
IngestionEntry::Regular {
|
||||
path,
|
||||
size,
|
||||
executable: entry.header().mode().map_err(Error::Archive)? & 64 != 0,
|
||||
})
|
||||
digest,
|
||||
}
|
||||
}
|
||||
tokio_tar::EntryType::Symlink => Node::Symlink(SymlinkNode {
|
||||
name,
|
||||
tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
|
||||
target: entry
|
||||
.link_name()
|
||||
.map_err(Error::Archive)?
|
||||
.expect("symlink missing target")
|
||||
.as_os_str()
|
||||
.as_bytes()
|
||||
.to_vec()
|
||||
.ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
|
||||
.into(),
|
||||
}),
|
||||
path,
|
||||
},
|
||||
// Push a bogus directory marker so we can make sure this directoy gets
|
||||
// created. We don't know the digest and size until after reading the full
|
||||
// tarball.
|
||||
tokio_tar::EntryType::Directory => Node::Directory(DirectoryNode {
|
||||
name,
|
||||
digest: Default::default(),
|
||||
size: 0,
|
||||
}),
|
||||
tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() },
|
||||
|
||||
tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
|
||||
|
||||
entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
|
||||
};
|
||||
|
||||
paths.insert(path, node);
|
||||
nodes.add(entry)?;
|
||||
}
|
||||
|
||||
// In the second phase, construct all of the directories.
|
||||
ingest_entries(
|
||||
directory_service,
|
||||
futures::stream::iter(nodes.finalize()?.into_iter().map(Ok)),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
// Collect into a list and then sort so all entries in the same directory
|
||||
// are next to each other.
|
||||
// We can detect boundaries between each directories to determine
|
||||
// when to construct or push directory entries.
|
||||
let mut ordered_paths = paths.into_iter().collect::<Vec<_>>();
|
||||
ordered_paths.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
/// Keep track of the directory structure of a file tree being ingested. This is used
|
||||
/// for ingestion sources which do not provide any ordering or uniqueness guarantees
|
||||
/// like tarballs.
|
||||
///
|
||||
/// If we ingest multiple entries with the same paths and both entries are not directories,
|
||||
/// the newer entry will replace the latter entry, disconnecting the old node's children
|
||||
/// from the graph.
|
||||
///
|
||||
/// Once all nodes are ingested a call to [IngestionEntryGraph::finalize] will return
|
||||
/// a list of entries compute by performaing a DFS post order traversal of the graph
|
||||
/// from the top-level directory entry.
|
||||
///
|
||||
/// This expects the directory structure to contain a single top-level directory entry.
|
||||
/// An error is returned if this is not the case and ingestion will fail.
|
||||
struct IngestionEntryGraph {
|
||||
graph: DiGraph<IngestionEntry, ()>,
|
||||
path_to_index: HashMap<PathBuf, NodeIndex>,
|
||||
root_node: Option<NodeIndex>,
|
||||
}
|
||||
|
||||
let mut directory_putter = directory_service.as_ref().put_multiple_start();
|
||||
impl Default for IngestionEntryGraph {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// Start with an initial directory at the root.
|
||||
let mut dir_stack = vec![(PathBuf::from(""), Directory::default())];
|
||||
impl IngestionEntryGraph {
|
||||
/// Creates a new ingestion entry graph.
|
||||
pub fn new() -> Self {
|
||||
IngestionEntryGraph {
|
||||
graph: DiGraph::new(),
|
||||
path_to_index: HashMap::new(),
|
||||
root_node: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn pop_directory(
|
||||
dir_stack: &mut Vec<(PathBuf, Directory)>,
|
||||
directory_putter: &mut Box<dyn DirectoryPutter>,
|
||||
) -> Result<DirectoryNode, Error> {
|
||||
let (path, directory) = dir_stack.pop().unwrap();
|
||||
/// Adds a new entry to the graph. Parent directories are automatically inserted.
|
||||
/// If a node exists in the graph with the same name as the new entry and both the old
|
||||
/// and new nodes are not directories, the node is replaced and is disconnected from its
|
||||
/// children.
|
||||
pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
|
||||
let path = entry.path().to_path_buf();
|
||||
|
||||
directory
|
||||
.validate()
|
||||
.map_err(|e| Error::InvalidDirectory(path.to_path_buf(), e))?;
|
||||
let index = match self.path_to_index.get(entry.path()) {
|
||||
Some(&index) => {
|
||||
// If either the old entry or new entry are not directories, we'll replace the old
|
||||
// entry.
|
||||
if !entry.is_dir() || !self.get_node(index).is_dir() {
|
||||
self.replace_node(index, entry);
|
||||
}
|
||||
|
||||
let dir_node = DirectoryNode {
|
||||
name: path
|
||||
.file_name()
|
||||
.unwrap_or_default()
|
||||
.as_bytes()
|
||||
.to_vec()
|
||||
.into(),
|
||||
digest: directory.digest().into(),
|
||||
size: directory.size(),
|
||||
index
|
||||
}
|
||||
None => self.graph.add_node(entry),
|
||||
};
|
||||
|
||||
if let Some((_, parent)) = dir_stack.last_mut() {
|
||||
parent.directories.push(dir_node.clone());
|
||||
}
|
||||
|
||||
directory_putter.put(directory).await?;
|
||||
|
||||
Ok(dir_node)
|
||||
}
|
||||
|
||||
fn push_directories(path: &Path, dir_stack: &mut Vec<(PathBuf, Directory)>) {
|
||||
if path == dir_stack.last().unwrap().0 {
|
||||
return;
|
||||
}
|
||||
if let Some(parent) = path.parent() {
|
||||
push_directories(parent, dir_stack);
|
||||
}
|
||||
dir_stack.push((path.to_path_buf(), Directory::default()));
|
||||
}
|
||||
|
||||
for (path, node) in ordered_paths.into_iter() {
|
||||
// Pop stack until the top dir is an ancestor of this entry.
|
||||
loop {
|
||||
let top = dir_stack.last().unwrap();
|
||||
if path.ancestors().any(|ancestor| ancestor == top.0) {
|
||||
break;
|
||||
// A path with 1 component is the root node
|
||||
if path.components().count() == 1 {
|
||||
// We expect archives to contain a single root node, if there is another root node
|
||||
// entry with a different path name, this is unsupported.
|
||||
if let Some(root_node) = self.root_node {
|
||||
if self.get_node(root_node).path() != path {
|
||||
return Err(Error::UnexpectedNumberOfTopLevelEntries);
|
||||
}
|
||||
}
|
||||
|
||||
pop_directory(&mut dir_stack, &mut directory_putter).await?;
|
||||
self.root_node = Some(index)
|
||||
} else if let Some(parent_path) = path.parent() {
|
||||
// Recursively add the parent node until it hits the root node.
|
||||
let parent_index = self.add(IngestionEntry::Dir {
|
||||
path: parent_path.to_path_buf(),
|
||||
})?;
|
||||
|
||||
// Insert an edge from the parent directory to the child entry.
|
||||
self.graph.add_edge(parent_index, index, ());
|
||||
}
|
||||
|
||||
// For directories, just ensure the directory node exists.
|
||||
if let Node::Directory(_) = node {
|
||||
push_directories(&path, &mut dir_stack);
|
||||
continue;
|
||||
self.path_to_index.insert(path, index);
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
/// Traverses the graph in DFS post order and collects the entries into a [Vec<IngestionEntry>].
|
||||
///
|
||||
/// Unreachable parts of the graph are not included in the result.
|
||||
pub fn finalize(self) -> Result<Vec<IngestionEntry>, Error> {
|
||||
// There must be a root node.
|
||||
let Some(root_node_index) = self.root_node else {
|
||||
return Err(Error::UnexpectedNumberOfTopLevelEntries);
|
||||
};
|
||||
|
||||
// The root node must be a directory.
|
||||
if !self.get_node(root_node_index).is_dir() {
|
||||
return Err(Error::UnexpectedNumberOfTopLevelEntries);
|
||||
}
|
||||
|
||||
// Push all ancestor directories onto the stack.
|
||||
push_directories(path.parent().unwrap(), &mut dir_stack);
|
||||
let mut traversal = DfsPostOrder::new(&self.graph, root_node_index);
|
||||
let mut nodes = Vec::with_capacity(self.graph.node_count());
|
||||
while let Some(node_index) = traversal.next(&self.graph) {
|
||||
nodes.push(self.get_node(node_index).clone());
|
||||
}
|
||||
|
||||
let top = dir_stack.last_mut().unwrap();
|
||||
debug_assert_eq!(Some(top.0.as_path()), path.parent());
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
match node {
|
||||
Node::File(n) => top.1.files.push(n),
|
||||
Node::Symlink(n) => top.1.symlinks.push(n),
|
||||
// We already handled directories above.
|
||||
Node::Directory(_) => unreachable!(),
|
||||
/// Replaces the node with the specified entry. The node's children are disconnected.
|
||||
///
|
||||
/// This should never be called if both the old and new nodes are directories.
|
||||
fn replace_node(&mut self, index: NodeIndex, new_entry: IngestionEntry) {
|
||||
let entry = self
|
||||
.graph
|
||||
.node_weight_mut(index)
|
||||
.expect("Tvix bug: missing node entry");
|
||||
|
||||
debug_assert!(!(entry.is_dir() && new_entry.is_dir()));
|
||||
|
||||
// Replace the node itself.
|
||||
warn!(
|
||||
"saw duplicate entry in archive at path {:?}. old: {:?} new: {:?}",
|
||||
entry.path(),
|
||||
&entry,
|
||||
&new_entry
|
||||
);
|
||||
*entry = new_entry;
|
||||
|
||||
// Remove any outgoing edges to disconnect the old node's children.
|
||||
let edges = self
|
||||
.graph
|
||||
.edges_directed(index, Direction::Outgoing)
|
||||
.map(|edge| edge.id())
|
||||
.collect::<Vec<_>>();
|
||||
for edge in edges {
|
||||
self.graph.remove_edge(edge);
|
||||
}
|
||||
}
|
||||
|
||||
let mut root_node = None;
|
||||
while !dir_stack.is_empty() {
|
||||
// If the root directory only has 1 directory entry, we return the child entry
|
||||
// instead... weeeee
|
||||
if dir_stack.len() == 1 && dir_stack.last().unwrap().1.directories.len() == 1 {
|
||||
break;
|
||||
}
|
||||
root_node = Some(pop_directory(&mut dir_stack, &mut directory_putter).await?);
|
||||
fn get_node(&self, index: NodeIndex) -> &IngestionEntry {
|
||||
self.graph
|
||||
.node_weight(index)
|
||||
.expect("Tvix bug: missing node entry")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::import::IngestionEntry;
|
||||
use crate::B3Digest;
|
||||
|
||||
use super::{Error, IngestionEntryGraph};
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use rstest::rstest;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into();
|
||||
pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() };
|
||||
pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() };
|
||||
pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() };
|
||||
pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular {
|
||||
path: "a".into(),
|
||||
size: 0,
|
||||
executable: false,
|
||||
digest: EMPTY_DIGEST.clone(),
|
||||
};
|
||||
pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular {
|
||||
path: "a/b".into(),
|
||||
size: 0,
|
||||
executable: false,
|
||||
digest: EMPTY_DIGEST.clone(),
|
||||
};
|
||||
pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular {
|
||||
path: "a/b/c".into(),
|
||||
size: 0,
|
||||
executable: false,
|
||||
digest: EMPTY_DIGEST.clone(),
|
||||
};
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
|
||||
#[case::explicit_directories(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
|
||||
#[case::inaccesible_tree(&[&*DIR_A, &*DIR_A_B, &*FILE_A_B], &[&*FILE_A_B, &*DIR_A])]
|
||||
fn node_ingestion_success(
|
||||
#[case] in_entries: &[&IngestionEntry],
|
||||
#[case] exp_entries: &[&IngestionEntry],
|
||||
) {
|
||||
let mut nodes = IngestionEntryGraph::new();
|
||||
|
||||
for entry in in_entries {
|
||||
nodes.add((*entry).clone()).expect("failed to add entry");
|
||||
}
|
||||
|
||||
let entries = nodes.finalize().expect("invalid entries");
|
||||
|
||||
let exp_entries: Vec<IngestionEntry> =
|
||||
exp_entries.iter().map(|entry| (*entry).clone()).collect();
|
||||
|
||||
assert_eq!(entries, exp_entries);
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[case::no_top_level_entries(&[], Error::UnexpectedNumberOfTopLevelEntries)]
|
||||
#[case::multiple_top_level_dirs(&[&*DIR_A, &*DIR_B], Error::UnexpectedNumberOfTopLevelEntries)]
|
||||
#[case::top_level_file_entry(&[&*FILE_A], Error::UnexpectedNumberOfTopLevelEntries)]
|
||||
fn node_ingestion_error(#[case] in_entries: &[&IngestionEntry], #[case] exp_error: Error) {
|
||||
let mut nodes = IngestionEntryGraph::new();
|
||||
|
||||
let result = (|| {
|
||||
for entry in in_entries {
|
||||
nodes.add((*entry).clone())?;
|
||||
}
|
||||
nodes.finalize()
|
||||
})();
|
||||
|
||||
let error = result.expect_err("expected error");
|
||||
assert_eq!(error.to_string(), exp_error.to_string());
|
||||
}
|
||||
let root_node = root_node.expect("no root node");
|
||||
|
||||
let root_digest = directory_putter.close().await?;
|
||||
|
||||
debug_assert_eq!(root_digest.as_slice(), &root_node.digest);
|
||||
|
||||
Ok(Node::Directory(root_node))
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::{fs::FileType, path::PathBuf};
|
||||
|
||||
use crate::{proto::ValidateDirectoryError, Error as CastoreError};
|
||||
use crate::Error as CastoreError;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
|
@ -25,11 +25,14 @@ pub enum Error {
|
|||
#[error("unsupported file {0} type: {1:?}")]
|
||||
UnsupportedFileType(PathBuf, FileType),
|
||||
|
||||
#[error("invalid directory contents {0}: {1}")]
|
||||
InvalidDirectory(PathBuf, ValidateDirectoryError),
|
||||
|
||||
#[error("unsupported tar entry {0} type: {1:?}")]
|
||||
UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
|
||||
|
||||
#[error("symlink missing target {0}")]
|
||||
MissingSymlinkTarget(PathBuf),
|
||||
|
||||
#[error("unexpected number of top level directory entries")]
|
||||
UnexpectedNumberOfTopLevelEntries,
|
||||
}
|
||||
|
||||
impl From<CastoreError> for Error {
|
||||
|
|
|
@ -199,6 +199,7 @@ where
|
|||
Ok(digest)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum IngestionEntry {
|
||||
Regular {
|
||||
path: PathBuf,
|
||||
|
@ -228,4 +229,8 @@ impl IngestionEntry {
|
|||
IngestionEntry::Unknown { path, .. } => path,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_dir(&self) -> bool {
|
||||
matches!(self, IngestionEntry::Dir { .. })
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue