refactor(tvix/castore): use Directory struct separate from proto one

This uses our own data type to deal with Directories in the castore model.

It makes some undesired states unrepresentable, removing the need for conversions and checking in various places:

 - In the protobuf, blake3 digests could have a wrong length, as proto doesn't know fixed-size fields. We now use `B3Digest`, which makes cloning cheaper, and removes the need to do size-checking everywhere.
 - In the protobuf, we had three different lists for `files`, `symlinks` and `directories`. This was mostly a protobuf size optimization, but made interacting with them a bit awkward. This has now been replaced with a list of enums, and convenience iterators to get various nodes, and add new ones.

Change-Id: I7b92691bb06d77ff3f58a5ccea94a22c16f84f04
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12057
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Yureka 2024-07-29 14:34:50 +02:00 committed by yuka
parent 5d3f3158d6
commit 3ca0b53840
53 changed files with 1429 additions and 1377 deletions

View file

@ -1,7 +1,9 @@
use std::path::{Path, PathBuf};
use itertools::Itertools;
use tvix_castore::proto::{NamedNode, ValidateNodeError};
use tvix_castore::directoryservice::NamedNode;
use tvix_castore::directoryservice::Node;
use tvix_castore::ValidateNodeError;
mod grpc_buildservice_wrapper;
@ -123,18 +125,17 @@ impl BuildRequest {
/// and all restrictions around paths themselves (relative, clean, …) need
// to be fulfilled.
pub fn validate(&self) -> Result<(), ValidateBuildRequestError> {
// validate all input nodes
for (i, n) in self.inputs.iter().enumerate() {
// ensure the input node itself is valid
n.validate()
.map_err(|e| ValidateBuildRequestError::InvalidInputNode(i, e))?;
}
// now we can look at the names, and make sure they're sorted.
if !is_sorted(
self.inputs
.iter()
.map(|e| e.node.as_ref().unwrap().get_name()),
// TODO(flokli) handle conversion errors and store result somewhere
.map(|e| {
Node::try_from(e.node.as_ref().unwrap())
.unwrap()
.get_name()
.clone()
}),
) {
Err(ValidateBuildRequestError::InputNodesNotSorted)?
}

View file

@ -6,7 +6,7 @@ use thiserror::Error;
pub struct B3Digest(Bytes);
// TODO: allow converting these errors to crate::Error
#[derive(Error, Debug)]
#[derive(Error, Debug, PartialEq)]
pub enum Error {
#[error("invalid digest length: {0}")]
InvalidDigestLen(usize),

View file

@ -9,7 +9,9 @@ use std::sync::Arc;
use tonic::async_trait;
use tracing::{instrument, trace, warn};
use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter};
use super::{
utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter,
};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::{proto, B3Digest, Error};
@ -149,7 +151,7 @@ fn derive_directory_key(digest: &B3Digest) -> String {
#[async_trait]
impl DirectoryService for BigtableDirectoryService {
#[instrument(skip(self, digest), err, fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
let mut client = self.client.clone();
let directory_key = derive_directory_key(digest);
@ -241,28 +243,20 @@ impl DirectoryService for BigtableDirectoryService {
// Try to parse the value into a Directory message.
let directory = proto::Directory::decode(Bytes::from(row_cell.value))
.map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?;
// validate the Directory.
directory
.validate()
.map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?
.try_into()
.map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?;
Ok(Some(directory))
}
#[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))]
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
let directory_digest = directory.digest();
let mut client = self.client.clone();
let directory_key = derive_directory_key(&directory_digest);
// Ensure the directory we're trying to upload passes validation
directory
.validate()
.map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?;
let data = directory.encode_to_vec();
let data = proto::Directory::from(directory).encode_to_vec();
if data.len() as u64 > CELL_SIZE_LIMIT {
return Err(Error::StorageError(
"Directory exceeds cell limit on Bigtable".into(),
@ -310,7 +304,7 @@ impl DirectoryService for BigtableDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>> {
) -> BoxStream<'static, Result<Directory, Error>> {
traverse_directory(self.clone(), root_directory_digest)
}

View file

@ -7,10 +7,9 @@ use futures::TryStreamExt;
use tonic::async_trait;
use tracing::{instrument, trace};
use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::directoryservice::DirectoryPutter;
use crate::proto;
use crate::B3Digest;
use crate::Error;
@ -40,7 +39,7 @@ where
DS2: DirectoryService + Clone + 'static,
{
#[instrument(skip(self, digest), fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
match self.near.get(digest).await? {
Some(directory) => {
trace!("serving from cache");
@ -82,7 +81,7 @@ where
}
#[instrument(skip_all)]
async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> {
async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
Err(Error::StorageError("unimplemented".to_string()))
}
@ -90,7 +89,7 @@ where
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>> {
) -> BoxStream<'static, Result<Directory, Error>> {
let near = self.near.clone();
let far = self.far.clone();
let digest = root_directory_digest.clone();

View file

@ -10,10 +10,8 @@ use petgraph::{
use tracing::instrument;
use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
use crate::{
proto::{self, Directory, DirectoryNode},
B3Digest,
};
use super::{Directory, DirectoryNode};
use crate::B3Digest;
#[derive(thiserror::Error, Debug)]
pub enum Error {
@ -88,7 +86,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> {
impl DirectoryGraph<LeavesToRootValidator> {
/// Insert a new Directory into the closure
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
if !self.order_validator.add_directory(&directory) {
return Err(Error::ValidationError(
"unknown directory was referenced".into(),
@ -108,7 +106,7 @@ impl DirectoryGraph<RootToLeavesValidator> {
/// Insert a new Directory into the closure
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
let digest = directory.digest();
if !self.order_validator.digest_allowed(&digest) {
return Err(Error::ValidationError("unexpected digest".into()));
@ -129,12 +127,7 @@ impl<O: OrderValidator> DirectoryGraph<O> {
}
/// Adds a directory which has already been confirmed to be in-order to the graph
pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> {
// Do some basic validation
directory
.validate()
.map_err(|e| Error::ValidationError(e.to_string()))?;
pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> {
let digest = directory.digest();
// Teach the graph about the existence of a node with this digest
@ -149,12 +142,10 @@ impl<O: OrderValidator> DirectoryGraph<O> {
}
// set up edges to all child directories
for subdir in &directory.directories {
let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap();
for subdir in directory.directories() {
let child_ix = *self
.digest_to_node_ix
.entry(subdir_digest)
.entry(subdir.digest.clone())
.or_insert_with(|| self.graph.add_node(None));
let pending_edge_check = match &self.graph[child_ix] {
@ -266,37 +257,36 @@ impl ValidatedDirectoryGraph {
.filter_map(move |i| nodes[i.index()].weight.take())
}
}
#[cfg(test)]
mod tests {
use crate::{
fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C},
proto::{self, Directory},
};
use lazy_static::lazy_static;
use rstest::rstest;
lazy_static! {
/*
pub static ref BROKEN_DIRECTORY : Directory = Directory {
symlinks: vec![proto::SymlinkNode {
symlinks: vec![SymlinkNode {
name: "".into(), // invalid name!
target: "doesntmatter".into(),
}],
..Default::default()
};
pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory {
directories: vec![proto::DirectoryNode {
name: "foo".into(),
digest: DIRECTORY_A.digest().into(),
size: DIRECTORY_A.size() + 42, // wrong!
}],
..Default::default()
};
}
*/
#[cfg(test)]
mod tests {
use crate::directoryservice::{Directory, DirectoryNode, Node};
use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
use lazy_static::lazy_static;
use rstest::rstest;
use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
lazy_static! {
pub static ref BROKEN_PARENT_DIRECTORY: Directory = {
let mut dir = Directory::new();
dir.add(Node::Directory(DirectoryNode::new(
"foo".into(),
DIRECTORY_A.digest(),
DIRECTORY_A.size() + 42, // wrong!
).unwrap())).unwrap();
dir
};
}
#[rstest]
/// Uploading an empty directory should succeed.
#[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
@ -312,8 +302,6 @@ mod tests {
#[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
/// Uploading B (referring to A) should fail immediately, because A was never uploaded.
#[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
/// Uploading a directory failing validation should fail immediately.
#[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)]
/// Uploading a directory which refers to another Directory with a wrong size should fail.
#[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
fn test_uploads(
@ -366,8 +354,6 @@ mod tests {
#[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)]
/// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root).
#[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)]
/// Downloading a directory failing validation should fail immediately.
#[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)]
/// Downloading a directory which refers to another Directory with a wrong size should fail.
#[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)]
fn test_downloads(

View file

@ -1,8 +1,9 @@
use std::collections::HashSet;
use super::{DirectoryPutter, DirectoryService};
use super::{Directory, DirectoryPutter, DirectoryService};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::proto::{self, get_directory_request::ByWhat};
use crate::ValidateDirectoryError;
use crate::{B3Digest, Error};
use async_stream::try_stream;
use futures::stream::BoxStream;
@ -41,10 +42,7 @@ where
T::Future: Send,
{
#[instrument(level = "trace", skip_all, fields(directory.digest = %digest))]
async fn get(
&self,
digest: &B3Digest,
) -> Result<Option<crate::proto::Directory>, crate::Error> {
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> {
// Get a new handle to the gRPC client, and copy the digest.
let mut grpc_client = self.grpc_client.clone();
let digest_cpy = digest.clone();
@ -72,15 +70,10 @@ where
"requested directory with digest {}, but got {}",
digest, actual_digest
)))
} else if let Err(e) = directory.validate() {
// Validate the Directory itself is valid.
warn!("directory failed validation: {}", e.to_string());
Err(crate::Error::StorageError(format!(
"directory {} failed validation: {}",
digest, e,
)))
} else {
Ok(Some(directory))
Ok(Some(directory.try_into().map_err(|_| {
Error::StorageError("invalid root digest length in response".to_string())
})?))
}
}
Ok(None) => Ok(None),
@ -90,11 +83,11 @@ where
}
#[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))]
async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> {
let resp = self
.grpc_client
.clone()
.put(tokio_stream::once(directory))
.put(tokio_stream::once(proto::Directory::from(directory)))
.await;
match resp {
@ -113,7 +106,7 @@ where
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>> {
) -> BoxStream<'static, Result<Directory, Error>> {
let mut grpc_client = self.grpc_client.clone();
let root_directory_digest = root_directory_digest.clone();
@ -135,14 +128,6 @@ where
loop {
match stream.message().await {
Ok(Some(directory)) => {
// validate the directory itself.
if let Err(e) = directory.validate() {
Err(crate::Error::StorageError(format!(
"directory {} failed validation: {}",
directory.digest(),
e,
)))?;
}
// validate we actually expected that directory, and move it from expected to received.
let directory_digest = directory.digest();
let was_expected = expected_directory_digests.remove(&directory_digest);
@ -168,6 +153,9 @@ where
.insert(child_directory_digest);
}
let directory = directory.try_into()
.map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?;
yield directory;
},
Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => {
@ -279,11 +267,11 @@ pub struct GRPCPutter {
#[async_trait]
impl DirectoryPutter for GRPCPutter {
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> {
match self.rq {
// If we're not already closed, send the directory to directory_sender.
Some((_, ref directory_sender)) => {
if directory_sender.send(directory).is_err() {
if directory_sender.send(directory.into()).is_err() {
// If the channel has been prematurely closed, invoke close (so we can peek at the error code)
// That error code is much more helpful, because it
// contains the error message from the server.

View file

@ -1,4 +1,4 @@
use crate::{proto, B3Digest, Error};
use crate::{B3Digest, Error};
use futures::stream::BoxStream;
use std::collections::HashMap;
use std::sync::Arc;
@ -7,8 +7,9 @@ use tonic::async_trait;
use tracing::{instrument, warn};
use super::utils::traverse_directory;
use super::{DirectoryPutter, DirectoryService, SimplePutter};
use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::proto;
#[derive(Clone, Default)]
pub struct MemoryDirectoryService {
@ -18,7 +19,7 @@ pub struct MemoryDirectoryService {
#[async_trait]
impl DirectoryService for MemoryDirectoryService {
#[instrument(skip(self, digest), fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
let db = self.db.read().await;
match db.get(digest) {
@ -37,35 +38,20 @@ impl DirectoryService for MemoryDirectoryService {
)));
}
// Validate the Directory itself is valid.
if let Err(e) = directory.validate() {
warn!("directory failed validation: {}", e.to_string());
return Err(Error::StorageError(format!(
"directory {} failed validation: {}",
actual_digest, e,
)));
}
Ok(Some(directory.clone()))
Ok(Some(directory.clone().try_into().map_err(|e| {
crate::Error::StorageError(format!("corrupted directory: {}", e))
})?))
}
}
}
#[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
let digest = directory.digest();
// validate the directory itself.
if let Err(e) = directory.validate() {
return Err(Error::InvalidRequest(format!(
"directory {} failed validation: {}",
digest, e,
)));
}
// store it
let mut db = self.db.write().await;
db.insert(digest.clone(), directory);
db.insert(digest.clone(), directory.into());
Ok(digest)
}
@ -74,7 +60,7 @@ impl DirectoryService for MemoryDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>> {
) -> BoxStream<'static, Result<Directory, Error>> {
traverse_directory(self.clone(), root_directory_digest)
}

View file

@ -1,6 +1,9 @@
use crate::composition::{Registry, ServiceBuilder};
use crate::{proto, B3Digest, Error};
use crate::proto;
use crate::{B3Digest, Error};
use crate::{ValidateDirectoryError, ValidateNodeError};
use bytes::Bytes;
use futures::stream::BoxStream;
use tonic::async_trait;
mod combinators;
@ -38,7 +41,7 @@ mod bigtable;
pub use self::bigtable::{BigtableDirectoryService, BigtableParameters};
/// The base trait all Directory services need to implement.
/// This is a simple get and put of [crate::proto::Directory], returning their
/// This is a simple get and put of [Directory], returning their
/// digest.
#[async_trait]
pub trait DirectoryService: Send + Sync {
@ -50,14 +53,14 @@ pub trait DirectoryService: Send + Sync {
/// Directory digests that are at the "root", aka the last element that's
/// sent to a DirectoryPutter. This makes sense for implementations bundling
/// closures of directories together in batches.
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>;
/// Uploads a single Directory message, and returns the calculated
/// digest, or an error. An error *must* also be returned if the message is
/// not valid.
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
async fn put(&self, directory: Directory) -> Result<B3Digest, Error>;
/// Looks up a closure of [proto::Directory].
/// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
/// Looks up a closure of [Directory].
/// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`,
/// and we'd be able to add a default implementation for it here, but
/// we can't have that yet.
///
@ -75,9 +78,9 @@ pub trait DirectoryService: Send + Sync {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>>;
) -> BoxStream<'static, Result<Directory, Error>>;
/// Allows persisting a closure of [proto::Directory], which is a graph of
/// Allows persisting a closure of [Directory], which is a graph of
/// connected Directory messages.
fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
}
@ -87,18 +90,18 @@ impl<A> DirectoryService for A
where
A: AsRef<dyn DirectoryService> + Send + Sync,
{
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
self.as_ref().get(digest).await
}
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
self.as_ref().put(directory).await
}
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>> {
) -> BoxStream<'static, Result<Directory, Error>> {
self.as_ref().get_recursive(root_directory_digest)
}
@ -107,7 +110,7 @@ where
}
}
/// Provides a handle to put a closure of connected [proto::Directory] elements.
/// Provides a handle to put a closure of connected [Directory] elements.
///
/// The consumer can periodically call [DirectoryPutter::put], starting from the
/// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
@ -119,12 +122,12 @@ where
/// but a single file or symlink.
#[async_trait]
pub trait DirectoryPutter: Send {
/// Put a individual [proto::Directory] into the store.
/// Put a individual [Directory] into the store.
/// Error semantics and behaviour is up to the specific implementation of
/// this trait.
/// Due to bursting, the returned error might refer to an object previously
/// sent via `put`.
async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
async fn put(&mut self, directory: Directory) -> Result<(), Error>;
/// Close the stream, and wait for any errors.
/// If there's been any invalid Directory message uploaded, and error *must*
@ -145,3 +148,461 @@ pub(crate) fn register_directory_services(reg: &mut Registry) {
reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable");
}
}
/// A Directory can contain Directory, File or Symlink nodes.
/// Each of these nodes have a name attribute, which is the basename in that
/// directory and node type specific attributes.
/// While a Node by itself may have any name, the names of Directory entries:
/// - MUST not contain slashes or null bytes
/// - MUST not be '.' or '..'
/// - MUST be unique across all three lists
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct Directory {
nodes: Vec<Node>,
}
/// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest].
/// It also gives it a `name` and `size`.
/// Such a node is either an element in the [Directory] it itself is contained in,
/// or a standalone root node./
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DirectoryNode {
/// The (base)name of the directory
name: Bytes,
/// The blake3 hash of a Directory message, serialized in protobuf canonical form.
digest: B3Digest,
/// Number of child elements in the Directory referred to by `digest`.
/// Calculated by summing up the numbers of nodes, and for each directory.
/// its size field. Can be used for inode allocation.
/// This field is precisely as verifiable as any other Merkle tree edge.
/// Resolve `digest`, and you can compute it incrementally. Resolve the entire
/// tree, and you can fully compute it from scratch.
/// A credulous implementation won't reject an excessive size, but this is
/// harmless: you'll have some ordinals without nodes. Undersizing is obvious
/// and easy to reject: you won't have an ordinal for some nodes.
size: u64,
}
impl DirectoryNode {
pub fn new(name: Bytes, digest: B3Digest, size: u64) -> Result<Self, ValidateNodeError> {
Ok(Self { name, digest, size })
}
pub fn digest(&self) -> &B3Digest {
&self.digest
}
pub fn size(&self) -> u64 {
self.size
}
}
/// A FileNode represents a regular or executable file in a Directory or at the root.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileNode {
/// The (base)name of the file
name: Bytes,
/// The blake3 digest of the file contents
digest: B3Digest,
/// The file content size
size: u64,
/// Whether the file is executable
executable: bool,
}
impl FileNode {
pub fn new(
name: Bytes,
digest: B3Digest,
size: u64,
executable: bool,
) -> Result<Self, ValidateNodeError> {
Ok(Self {
name,
digest,
size,
executable,
})
}
pub fn digest(&self) -> &B3Digest {
&self.digest
}
pub fn size(&self) -> u64 {
self.size
}
pub fn executable(&self) -> bool {
self.executable
}
}
/// A SymlinkNode represents a symbolic link in a Directory or at the root.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SymlinkNode {
/// The (base)name of the symlink
name: Bytes,
/// The target of the symlink.
target: Bytes,
}
impl SymlinkNode {
pub fn new(name: Bytes, target: Bytes) -> Result<Self, ValidateNodeError> {
if target.is_empty() || target.contains(&b'\0') {
return Err(ValidateNodeError::InvalidSymlinkTarget(target));
}
Ok(Self { name, target })
}
pub fn target(&self) -> &bytes::Bytes {
&self.target
}
}
/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode].
/// While a Node by itself may have any name, only those matching specific requirements
/// can can be added as entries to a [Directory] (see the documentation on [Directory] for details).
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Node {
Directory(DirectoryNode),
File(FileNode),
Symlink(SymlinkNode),
}
/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode]
/// and [Node], so we can ask all of them for the name easily.
pub trait NamedNode {
fn get_name(&self) -> &bytes::Bytes;
}
impl NamedNode for &FileNode {
fn get_name(&self) -> &bytes::Bytes {
&self.name
}
}
impl NamedNode for FileNode {
fn get_name(&self) -> &bytes::Bytes {
&self.name
}
}
impl NamedNode for &DirectoryNode {
fn get_name(&self) -> &bytes::Bytes {
&self.name
}
}
impl NamedNode for DirectoryNode {
fn get_name(&self) -> &bytes::Bytes {
&self.name
}
}
impl NamedNode for &SymlinkNode {
fn get_name(&self) -> &bytes::Bytes {
&self.name
}
}
impl NamedNode for SymlinkNode {
fn get_name(&self) -> &bytes::Bytes {
&self.name
}
}
impl NamedNode for &Node {
fn get_name(&self) -> &bytes::Bytes {
match self {
Node::File(node_file) => &node_file.name,
Node::Directory(node_directory) => &node_directory.name,
Node::Symlink(node_symlink) => &node_symlink.name,
}
}
}
impl NamedNode for Node {
fn get_name(&self) -> &bytes::Bytes {
match self {
Node::File(node_file) => &node_file.name,
Node::Directory(node_directory) => &node_directory.name,
Node::Symlink(node_symlink) => &node_symlink.name,
}
}
}
impl Node {
/// Returns the node with a new name.
pub fn rename(self, name: bytes::Bytes) -> Self {
match self {
Node::Directory(n) => Node::Directory(DirectoryNode { name, ..n }),
Node::File(n) => Node::File(FileNode { name, ..n }),
Node::Symlink(n) => Node::Symlink(SymlinkNode { name, ..n }),
}
}
}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Node {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.get_name().cmp(other.get_name())
}
}
impl PartialOrd for FileNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for FileNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.get_name().cmp(other.get_name())
}
}
impl PartialOrd for DirectoryNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for DirectoryNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.get_name().cmp(other.get_name())
}
}
impl PartialOrd for SymlinkNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SymlinkNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.get_name().cmp(other.get_name())
}
}
fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
}
impl Directory {
pub fn new() -> Self {
Directory { nodes: vec![] }
}
/// The size of a directory is the number of all regular and symlink elements,
/// the number of directory elements, and their size fields.
pub fn size(&self) -> u64 {
// It's impossible to create a Directory where the size overflows, because we
// check before every add() that the size won't overflow.
(self.nodes.len() as u64) + self.directories().map(|e| e.size).sum::<u64>()
}
/// Calculates the digest of a Directory, which is the blake3 hash of a
/// Directory protobuf message, serialized in protobuf canonical form.
pub fn digest(&self) -> B3Digest {
proto::Directory::from(self).digest()
}
/// Allows iterating over all nodes (directories, files and symlinks)
/// ordered by their name.
pub fn nodes(&self) -> impl Iterator<Item = &Node> + Send + Sync + '_ {
self.nodes.iter()
}
/// Allows iterating over the FileNode entries of this directory
/// ordered by their name
pub fn files(&self) -> impl Iterator<Item = &FileNode> + Send + Sync + '_ {
self.nodes.iter().filter_map(|node| match node {
Node::File(n) => Some(n),
_ => None,
})
}
/// Allows iterating over the subdirectories of this directory
/// ordered by their name
pub fn directories(&self) -> impl Iterator<Item = &DirectoryNode> + Send + Sync + '_ {
self.nodes.iter().filter_map(|node| match node {
Node::Directory(n) => Some(n),
_ => None,
})
}
/// Allows iterating over the SymlinkNode entries of this directory
/// ordered by their name
pub fn symlinks(&self) -> impl Iterator<Item = &SymlinkNode> + Send + Sync + '_ {
self.nodes.iter().filter_map(|node| match node {
Node::Symlink(n) => Some(n),
_ => None,
})
}
/// Checks a Node name for validity as a directory entry
/// We disallow slashes, null bytes, '.', '..' and the empty string.
pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
if name.is_empty()
|| name == b".."
|| name == b"."
|| name.contains(&0x00)
|| name.contains(&b'/')
{
Err(ValidateNodeError::InvalidName(name.to_owned().into()))
} else {
Ok(())
}
}
/// Adds the specified [Node] to the [Directory], preserving sorted entries.
///
/// Inserting an element that already exists with the same name in the directory will yield an
/// error.
/// Inserting an element will validate that its name fulfills the stricter requirements for
/// directory entries and yield an error if it is not.
pub fn add(&mut self, node: Node) -> Result<(), ValidateDirectoryError> {
Self::validate_node_name(node.get_name())
.map_err(|e| ValidateDirectoryError::InvalidNode(node.get_name().clone().into(), e))?;
// Check that the even after adding this new directory entry, the size calculation will not
// overflow
// FUTUREWORK: add some sort of batch add interface which only does this check once with
// all the to-be-added entries
checked_sum([
self.size(),
1,
match node {
Node::Directory(ref dir) => dir.size,
_ => 0,
},
])
.ok_or(ValidateDirectoryError::SizeOverflow)?;
// This assumes the [Directory] is sorted, since we don't allow accessing the nodes list
// directly and all previous inserts should have been in-order
let pos = match self
.nodes
.binary_search_by_key(&node.get_name(), |n| n.get_name())
{
Err(pos) => pos, // There is no node with this name; good!
Ok(_) => {
return Err(ValidateDirectoryError::DuplicateName(
node.get_name().to_vec(),
))
}
};
self.nodes.insert(pos, node);
Ok(())
}
}
#[cfg(test)]
mod test {
use super::{Directory, DirectoryNode, FileNode, Node, SymlinkNode};
use crate::fixtures::DUMMY_DIGEST;
use crate::ValidateDirectoryError;
#[test]
fn add_nodes_to_directory() {
let mut d = Directory::new();
d.add(Node::Directory(
DirectoryNode::new("b".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
))
.unwrap();
d.add(Node::Directory(
DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
))
.unwrap();
d.add(Node::Directory(
DirectoryNode::new("z".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
))
.unwrap();
d.add(Node::File(
FileNode::new("f".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
))
.unwrap();
d.add(Node::File(
FileNode::new("c".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
))
.unwrap();
d.add(Node::File(
FileNode::new("g".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
))
.unwrap();
d.add(Node::Symlink(
SymlinkNode::new("t".into(), "a".into()).unwrap(),
))
.unwrap();
d.add(Node::Symlink(
SymlinkNode::new("o".into(), "a".into()).unwrap(),
))
.unwrap();
d.add(Node::Symlink(
SymlinkNode::new("e".into(), "a".into()).unwrap(),
))
.unwrap();
// Convert to proto struct and back to ensure we are not generating any invalid structures
crate::directoryservice::Directory::try_from(crate::proto::Directory::from(d))
.expect("directory should be valid");
}
#[test]
fn validate_overflow() {
let mut d = Directory::new();
assert_eq!(
d.add(Node::Directory(
DirectoryNode::new("foo".into(), DUMMY_DIGEST.clone(), u64::MAX).unwrap(),
)),
Err(ValidateDirectoryError::SizeOverflow)
);
}
#[test]
fn add_duplicate_node_to_directory() {
let mut d = Directory::new();
d.add(Node::Directory(
DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
))
.unwrap();
assert_eq!(
format!(
"{}",
d.add(Node::File(
FileNode::new("a".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
))
.expect_err("adding duplicate dir entry must fail")
),
"\"a\" is a duplicate name"
);
}
/// Attempt to add a directory entry with a name which should be rejected.
#[tokio::test]
async fn directory_reject_invalid_name() {
let mut dir = Directory::new();
assert!(
dir.add(Node::Symlink(
SymlinkNode::new(
"".into(), // wrong! can not be added to directory
"doesntmatter".into(),
)
.unwrap()
))
.is_err(),
"invalid symlink entry be rejected"
);
}
}

View file

@ -17,7 +17,8 @@ use tracing::{instrument, trace, warn, Level};
use url::Url;
use super::{
DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator,
Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
RootToLeavesValidator,
};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::{proto, B3Digest, Error};
@ -78,13 +79,13 @@ impl DirectoryService for ObjectStoreDirectoryService {
/// This is the same steps as for get_recursive anyways, so we just call get_recursive and
/// return the first element of the stream and drop the request.
#[instrument(skip(self, digest), fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
self.get_recursive(digest).take(1).next().await.transpose()
}
#[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
if !directory.directories.is_empty() {
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
if directory.directories().next().is_some() {
return Err(Error::InvalidRequest(
"only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(),
));
@ -99,7 +100,7 @@ impl DirectoryService for ObjectStoreDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>> {
) -> BoxStream<'static, Result<Directory, Error>> {
// Check that we are not passing on bogus from the object store to the client, and that the
// trust chain from the root digest to the leaves is intact
let mut order_validator =
@ -145,6 +146,10 @@ impl DirectoryService for ObjectStoreDirectoryService {
warn!("unable to parse directory {}: {}", digest, e);
Error::StorageError(e.to_string())
})?;
let directory = Directory::try_from(directory).map_err(|e| {
warn!("unable to convert directory {}: {}", digest, e);
Error::StorageError(e.to_string())
})?;
// Allow the children to appear next
order_validator.add_directory_unchecked(&directory);
@ -244,7 +249,7 @@ impl ObjectStoreDirectoryPutter {
#[async_trait]
impl DirectoryPutter for ObjectStoreDirectoryPutter {
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
async fn put(&mut self, directory: Directory) -> Result<(), Error> {
match self.directory_validator {
None => return Err(Error::StorageError("already closed".to_string())),
Some(ref mut validator) => {
@ -302,7 +307,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter {
for directory in directories {
directories_sink
.send(directory.encode_to_vec().into())
.send(proto::Directory::from(directory).encode_to_vec().into())
.await?;
}

View file

@ -1,7 +1,8 @@
use std::collections::HashSet;
use tracing::warn;
use crate::{proto::Directory, B3Digest};
use super::Directory;
use crate::B3Digest;
pub trait OrderValidator {
/// Update the order validator's state with the directory
@ -47,10 +48,9 @@ impl RootToLeavesValidator {
self.expected_digests.insert(directory.digest());
}
for subdir in &directory.directories {
for subdir in directory.directories() {
// Allow the children to appear next
let subdir_digest = subdir.digest.clone().try_into().unwrap();
self.expected_digests.insert(subdir_digest);
self.expected_digests.insert(subdir.digest.clone());
}
}
}
@ -79,12 +79,11 @@ impl OrderValidator for LeavesToRootValidator {
fn add_directory(&mut self, directory: &Directory) -> bool {
let digest = directory.digest();
for subdir in &directory.directories {
let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory()
if !self.allowed_references.contains(&subdir_digest) {
for subdir in directory.directories() {
if !self.allowed_references.contains(&subdir.digest) {
warn!(
directory.digest = %digest,
subdirectory.digest = %subdir_digest,
subdirectory.digest = %subdir.digest,
"unexpected directory reference"
);
return false;
@ -101,8 +100,8 @@ impl OrderValidator for LeavesToRootValidator {
mod tests {
use super::{LeavesToRootValidator, RootToLeavesValidator};
use crate::directoryservice::order_validator::OrderValidator;
use crate::directoryservice::Directory;
use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
use crate::proto::Directory;
use rstest::rstest;
#[rstest]

View file

@ -5,15 +5,15 @@ use std::{path::PathBuf, sync::Arc};
use tonic::async_trait;
use tracing::{instrument, warn};
use super::{
traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService,
LeavesToRootValidator,
};
use crate::{
composition::{CompositionContext, ServiceBuilder},
digests, proto, B3Digest, Error,
};
use super::{
traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
};
const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> =
TableDefinition::new("directory");
@ -69,7 +69,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
#[async_trait]
impl DirectoryService for RedbDirectoryService {
#[instrument(skip(self, digest), fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
let db = self.db.clone();
// Retrieves the protobuf-encoded Directory for the corresponding digest.
@ -107,13 +107,10 @@ impl DirectoryService for RedbDirectoryService {
let directory = match proto::Directory::decode(&*directory_data.value()) {
Ok(dir) => {
// The returned Directory must be valid.
if let Err(e) = dir.validate() {
dir.try_into().map_err(|e| {
warn!(err=%e, "Directory failed validation");
return Err(Error::StorageError(
"Directory failed validation".to_string(),
));
}
dir
Error::StorageError("Directory failed validation".to_string())
})?
}
Err(e) => {
warn!(err=%e, "failed to parse Directory");
@ -125,26 +122,21 @@ impl DirectoryService for RedbDirectoryService {
}
#[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
tokio::task::spawn_blocking({
let db = self.db.clone();
move || {
let digest = directory.digest();
// Validate the directory.
if let Err(e) = directory.validate() {
warn!(err=%e, "Directory failed validation");
return Err(Error::StorageError(
"Directory failed validation".to_string(),
));
}
// Store the directory in the table.
let txn = db.begin_write()?;
{
let mut table = txn.open_table(DIRECTORY_TABLE)?;
let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into();
table.insert(digest_as_array, directory.encode_to_vec())?;
table.insert(
digest_as_array,
proto::Directory::from(directory).encode_to_vec(),
)?;
}
txn.commit()?;
@ -158,7 +150,7 @@ impl DirectoryService for RedbDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>> {
) -> BoxStream<'static, Result<Directory, Error>> {
// FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
// redb transaction to avoid constantly closing and opening new transactions for the
// database.
@ -185,7 +177,7 @@ pub struct RedbDirectoryPutter {
#[async_trait]
impl DirectoryPutter for RedbDirectoryPutter {
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
async fn put(&mut self, directory: Directory) -> Result<(), Error> {
match self.directory_validator {
None => return Err(Error::StorageError("already closed".to_string())),
Some(ref mut validator) => {
@ -228,7 +220,10 @@ impl DirectoryPutter for RedbDirectoryPutter {
for directory in directories {
let digest_as_array: [u8; digests::B3_LEN] =
directory.digest().into();
table.insert(digest_as_array, directory.encode_to_vec())?;
table.insert(
digest_as_array,
proto::Directory::from(directory).encode_to_vec(),
)?;
}
}

View file

@ -1,7 +1,6 @@
use super::DirectoryPutter;
use super::DirectoryService;
use super::{DirectoryGraph, LeavesToRootValidator};
use crate::proto;
use super::{Directory, DirectoryGraph, LeavesToRootValidator};
use crate::B3Digest;
use crate::Error;
use tonic::async_trait;
@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> {
#[async_trait]
impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
async fn put(&mut self, directory: Directory) -> Result<(), Error> {
match self.directory_validator {
None => return Err(Error::StorageError("already closed".to_string())),
Some(ref mut validator) => {

View file

@ -1,5 +1,3 @@
use crate::proto::Directory;
use crate::{proto, B3Digest, Error};
use futures::stream::BoxStream;
use prost::Message;
use std::ops::Deref;
@ -9,8 +7,9 @@ use tonic::async_trait;
use tracing::{instrument, warn};
use super::utils::traverse_directory;
use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
use crate::composition::{CompositionContext, ServiceBuilder};
use crate::{proto, B3Digest, Error};
#[derive(Clone)]
pub struct SledDirectoryService {
@ -44,7 +43,7 @@ impl SledDirectoryService {
#[async_trait]
impl DirectoryService for SledDirectoryService {
#[instrument(skip(self, digest), fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
let resp = tokio::task::spawn_blocking({
let db = self.db.clone();
let digest = digest.clone();
@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService {
None => Ok(None),
// The directory was found, try to parse the data as Directory message
Some(data) => match Directory::decode(&*data) {
Some(data) => match proto::Directory::decode(&*data) {
Ok(directory) => {
// Validate the retrieved Directory indeed has the
// digest we expect it to have, to detect corruptions.
@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService {
)));
}
// Validate the Directory itself is valid.
if let Err(e) = directory.validate() {
warn!("directory failed validation: {}", e.to_string());
return Err(Error::StorageError(format!(
"directory {} failed validation: {}",
actual_digest, e,
)));
}
let directory = directory.try_into().map_err(|e| {
warn!("failed to retrieve directory: {}", e);
Error::StorageError(format!("failed to retrieve directory: {}", e))
})?;
Ok(Some(directory))
}
@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService {
}
#[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
tokio::task::spawn_blocking({
let db = self.db.clone();
move || {
let digest = directory.digest();
// validate the directory itself.
if let Err(e) = directory.validate() {
return Err(Error::InvalidRequest(format!(
"directory {} failed validation: {}",
digest, e,
)));
}
// store it
db.insert(digest.as_slice(), directory.encode_to_vec())
.map_err(|e| Error::StorageError(e.to_string()))?;
db.insert(
digest.as_slice(),
proto::Directory::from(directory).encode_to_vec(),
)
.map_err(|e| Error::StorageError(e.to_string()))?;
Ok(digest)
}
@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService {
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<proto::Directory, Error>> {
) -> BoxStream<'static, Result<Directory, Error>> {
traverse_directory(self.clone(), root_directory_digest)
}
@ -215,7 +206,7 @@ pub struct SledDirectoryPutter {
#[async_trait]
impl DirectoryPutter for SledDirectoryPutter {
#[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
async fn put(&mut self, directory: Directory) -> Result<(), Error> {
match self.directory_validator {
None => return Err(Error::StorageError("already closed".to_string())),
Some(ref mut validator) => {
@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter {
let mut batch = sled::Batch::default();
for directory in directories {
batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
batch.insert(
directory.digest().as_slice(),
proto::Directory::from(directory).encode_to_vec(),
);
}
tree.apply_batch(batch).map_err(|e| {

View file

@ -8,10 +8,8 @@ use rstest_reuse::{self, *};
use super::DirectoryService;
use crate::directoryservice;
use crate::{
fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D},
proto::{self, Directory},
};
use crate::directoryservice::{Directory, DirectoryNode, Node};
use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D};
mod utils;
use self::utils::make_grpc_directory_service_client;
@ -41,10 +39,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) {
// recursive get
assert_eq!(
Vec::<Result<proto::Directory, crate::Error>>::new(),
Vec::<Result<Directory, crate::Error>>::new(),
directory_service
.get_recursive(&DIRECTORY_A.digest())
.collect::<Vec<Result<proto::Directory, crate::Error>>>()
.collect::<Vec<Result<Directory, crate::Error>>>()
.await
);
}
@ -212,59 +210,26 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService
}
}
/// Try uploading a Directory failing its internal validation, ensure it gets
/// rejected.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_failing_validation(directory_service: impl DirectoryService) {
let broken_directory = Directory {
symlinks: vec![proto::SymlinkNode {
name: "".into(), // wrong!
target: "doesntmatter".into(),
}],
..Default::default()
};
assert!(broken_directory.validate().is_err());
// Try to upload via single upload.
assert!(
directory_service
.put(broken_directory.clone())
.await
.is_err(),
"single upload must fail"
);
// Try to upload via put_multiple. We're a bit more permissive here, the
// intermediate .put() might succeed, due to client-side bursting (in the
// case of gRPC), but then the close MUST fail.
let mut handle = directory_service.put_multiple_start();
if handle.put(broken_directory).await.is_ok() {
assert!(
handle.close().await.is_err(),
"when succeeding put, close must fail"
)
}
}
/// Try uploading a Directory that refers to a previously-uploaded directory.
/// Both pass their isolated validation, but the size field in the parent is wrong.
/// This should be rejected.
#[apply(directory_services)]
#[tokio::test]
async fn upload_reject_wrong_size(directory_service: impl DirectoryService) {
let wrong_parent_directory = Directory {
directories: vec![proto::DirectoryNode {
name: "foo".into(),
digest: DIRECTORY_A.digest().into(),
size: DIRECTORY_A.size() + 42, // wrong!
}],
..Default::default()
let wrong_parent_directory = {
let mut dir = Directory::new();
dir.add(Node::Directory(
DirectoryNode::new(
"foo".into(),
DIRECTORY_A.digest(),
DIRECTORY_A.size() + 42, // wrong!
)
.unwrap(),
))
.unwrap();
dir
};
// Make sure isolated validation itself is ok
assert!(wrong_parent_directory.validate().is_ok());
// Now upload both. Ensure it either fails during the second put, or during
// the close.
let mut handle = directory_service.put_multiple_start();

View file

@ -1,8 +1,5 @@
use super::DirectoryService;
use crate::{
proto::{node::Node, NamedNode},
B3Digest, Error, Path,
};
use super::{DirectoryService, NamedNode, Node};
use crate::{Error, Path};
use tracing::{instrument, warn};
/// This descends from a (root) node to the given (sub)path, returning the Node
@ -25,34 +22,31 @@ where
return Ok(None);
}
Node::Directory(directory_node) => {
let digest: B3Digest = directory_node
.digest
.try_into()
.map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
// fetch the linked node from the directory_service.
let directory =
directory_service
.as_ref()
.get(&digest)
.await?
.ok_or_else(|| {
// If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
warn!("directory {} does not exist", digest);
let directory = directory_service
.as_ref()
.get(&directory_node.digest)
.await?
.ok_or_else(|| {
// If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
warn!("directory {} does not exist", directory_node.digest);
Error::StorageError(format!("directory {} does not exist", digest))
})?;
Error::StorageError(format!(
"directory {} does not exist",
directory_node.digest
))
})?;
// look for the component in the [Directory].
// FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
// could stop as soon as e.name is larger than the search string.
if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) {
// child node found, update prev_node to that and continue.
parent_node = child_node;
parent_node = child_node.clone();
} else {
// child node not found means there's no such element inside the directory.
return Ok(None);
}
};
}
}
}
@ -65,6 +59,7 @@ where
mod tests {
use crate::{
directoryservice,
directoryservice::{DirectoryNode, Node},
fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
PathBuf,
};
@ -88,21 +83,21 @@ mod tests {
handle.close().await.expect("must upload");
// construct the node for DIRECTORY_COMPLICATED
let node_directory_complicated =
crate::proto::node::Node::Directory(crate::proto::DirectoryNode {
name: "doesntmatter".into(),
digest: DIRECTORY_COMPLICATED.digest().into(),
size: DIRECTORY_COMPLICATED.size(),
});
// construct the node for DIRECTORY_COMPLICATED
let node_directory_with_keep = crate::proto::node::Node::Directory(
DIRECTORY_COMPLICATED.directories.first().unwrap().clone(),
let node_directory_complicated = Node::Directory(
DirectoryNode::new(
"doesntmatter".into(),
DIRECTORY_COMPLICATED.digest(),
DIRECTORY_COMPLICATED.size(),
)
.unwrap(),
);
// construct the node for DIRECTORY_COMPLICATED
let node_directory_with_keep =
Node::Directory(DIRECTORY_COMPLICATED.directories().next().unwrap().clone());
// construct the node for the .keep file
let node_file_keep =
crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone());
let node_file_keep = Node::File(DIRECTORY_WITH_KEEP.files().next().unwrap().clone());
// traversal to an empty subpath should return the root node.
{

View file

@ -1,5 +1,5 @@
use super::Directory;
use super::DirectoryService;
use crate::proto;
use crate::B3Digest;
use crate::Error;
use async_stream::try_stream;
@ -8,14 +8,14 @@ use std::collections::{HashSet, VecDeque};
use tracing::instrument;
use tracing::warn;
/// Traverses a [proto::Directory] from the root to the children.
/// Traverses a [Directory] from the root to the children.
///
/// This is mostly BFS, but directories are only returned once.
#[instrument(skip(directory_service))]
pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
directory_service: DS,
root_directory_digest: &B3Digest,
) -> BoxStream<'a, Result<proto::Directory, Error>> {
) -> BoxStream<'a, Result<Directory, Error>> {
// The list of all directories that still need to be traversed. The next
// element is picked from the front, new elements are enqueued at the
// back.
@ -50,16 +50,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
Some(dir) => dir,
};
// validate, we don't want to send invalid directories.
current_directory.validate().map_err(|e| {
warn!("directory failed validation: {}", e.to_string());
Error::StorageError(format!(
"invalid directory: {}",
current_directory_digest
))
})?;
// We're about to send this directory, so let's avoid sending it again if a
// descendant has it.
sent_directory_digests.insert(current_directory_digest);
@ -67,9 +57,9 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
// enqueue all child directory digests to the work queue, as
// long as they're not part of the worklist or already sent.
// This panics if the digest looks invalid, it's supposed to be checked first.
for child_directory_node in &current_directory.directories {
for child_directory_node in current_directory.directories() {
// TODO: propagate error
let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
let child_digest: B3Digest = child_directory_node.digest.clone();
if worklist_directory_digests.contains(&child_digest)
|| sent_directory_digests.contains(&child_digest)

View file

@ -1,3 +1,4 @@
use bstr::ByteSlice;
use thiserror::Error;
use tokio::task::JoinError;
use tonic::Status;
@ -12,6 +13,46 @@ pub enum Error {
StorageError(String),
}
/// Errors that can occur during the validation of [Directory] messages.
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum ValidateDirectoryError {
/// Elements are not in sorted order
#[error("{:?} is not sorted", .0.as_bstr())]
WrongSorting(Vec<u8>),
/// Multiple elements with the same name encountered
#[error("{:?} is a duplicate name", .0.as_bstr())]
DuplicateName(Vec<u8>),
/// Invalid node
#[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())]
InvalidNode(Vec<u8>, ValidateNodeError),
#[error("Total size exceeds u32::MAX")]
SizeOverflow,
}
/// Errors that occur during Node validation
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum ValidateNodeError {
#[error("No node set")]
NoNodeSet,
/// Invalid digest length encountered
#[error("invalid digest length: {0}")]
InvalidDigestLen(usize),
/// Invalid name encountered
#[error("Invalid name: {}", .0.as_bstr())]
InvalidName(bytes::Bytes),
/// Invalid symlink target
#[error("Invalid symlink target: {}", .0.as_bstr())]
InvalidSymlinkTarget(bytes::Bytes),
}
impl From<crate::digests::Error> for ValidateNodeError {
fn from(e: crate::digests::Error) -> Self {
match e {
crate::digests::Error::InvalidDigestLen(n) => ValidateNodeError::InvalidDigestLen(n),
}
}
}
impl From<JoinError> for Error {
fn from(value: JoinError) -> Self {
Error::StorageError(value.to_string())

View file

@ -1,5 +1,5 @@
use crate::{
proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode},
directoryservice::{Directory, DirectoryNode, FileNode, Node, SymlinkNode},
B3Digest,
};
use lazy_static::lazy_static;
@ -34,70 +34,72 @@ lazy_static! {
pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into();
// Directories
pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory {
directories: vec![],
files: vec![FileNode {
name: b".keep".to_vec().into(),
digest: EMPTY_BLOB_DIGEST.clone().into(),
size: 0,
executable: false,
}],
symlinks: vec![],
pub static ref DIRECTORY_WITH_KEEP: Directory = {
let mut dir = Directory::new();
dir.add(Node::File(FileNode::new(
b".keep".to_vec().into(),
EMPTY_BLOB_DIGEST.clone(),
0,
false
).unwrap())).unwrap();
dir
};
pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory {
directories: vec![DirectoryNode {
name: b"keep".to_vec().into(),
digest: DIRECTORY_WITH_KEEP.digest().into(),
size: DIRECTORY_WITH_KEEP.size(),
}],
files: vec![FileNode {
name: b".keep".to_vec().into(),
digest: EMPTY_BLOB_DIGEST.clone().into(),
size: 0,
executable: false,
}],
symlinks: vec![SymlinkNode {
name: b"aa".to_vec().into(),
target: b"/nix/store/somewhereelse".to_vec().into(),
}],
pub static ref DIRECTORY_COMPLICATED: Directory = {
let mut dir = Directory::new();
dir.add(Node::Directory(DirectoryNode::new(
b"keep".to_vec().into(),
DIRECTORY_WITH_KEEP.digest(),
DIRECTORY_WITH_KEEP.size()
).unwrap())).unwrap();
dir.add(Node::File(FileNode::new(
b".keep".to_vec().into(),
EMPTY_BLOB_DIGEST.clone(),
0,
false
).unwrap())).unwrap();
dir.add(Node::Symlink(SymlinkNode::new(
b"aa".to_vec().into(),
b"/nix/store/somewhereelse".to_vec().into()
).unwrap())).unwrap();
dir
};
pub static ref DIRECTORY_A: Directory = Directory::default();
pub static ref DIRECTORY_B: Directory = Directory {
directories: vec![DirectoryNode {
name: b"a".to_vec().into(),
digest: DIRECTORY_A.digest().into(),
size: DIRECTORY_A.size(),
}],
..Default::default()
pub static ref DIRECTORY_A: Directory = Directory::new();
pub static ref DIRECTORY_B: Directory = {
let mut dir = Directory::new();
dir.add(Node::Directory(DirectoryNode::new(
b"a".to_vec().into(),
DIRECTORY_A.digest(),
DIRECTORY_A.size(),
).unwrap())).unwrap();
dir
};
pub static ref DIRECTORY_C: Directory = Directory {
directories: vec![
DirectoryNode {
name: b"a".to_vec().into(),
digest: DIRECTORY_A.digest().into(),
size: DIRECTORY_A.size(),
},
DirectoryNode {
name: b"a'".to_vec().into(),
digest: DIRECTORY_A.digest().into(),
size: DIRECTORY_A.size(),
}
],
..Default::default()
pub static ref DIRECTORY_C: Directory = {
let mut dir = Directory::new();
dir.add(Node::Directory(DirectoryNode::new(
b"a".to_vec().into(),
DIRECTORY_A.digest(),
DIRECTORY_A.size(),
).unwrap())).unwrap();
dir.add(Node::Directory(DirectoryNode::new(
b"a'".to_vec().into(),
DIRECTORY_A.digest(),
DIRECTORY_A.size(),
).unwrap())).unwrap();
dir
};
pub static ref DIRECTORY_D: proto::Directory = proto::Directory {
directories: vec![
DirectoryNode {
name: b"a".to_vec().into(),
digest: DIRECTORY_A.digest().into(),
size: DIRECTORY_A.size(),
},
DirectoryNode {
name: b"b'".to_vec().into(),
digest: DIRECTORY_B.digest().into(),
size: DIRECTORY_B.size(),
}
],
..Default::default()
pub static ref DIRECTORY_D: Directory = {
let mut dir = Directory::new();
dir.add(Node::Directory(DirectoryNode::new(
b"a".to_vec().into(),
DIRECTORY_A.digest(),
DIRECTORY_A.size(),
).unwrap())).unwrap();
dir.add(Node::Directory(DirectoryNode::new(
b"b".to_vec().into(),
DIRECTORY_B.digest(),
DIRECTORY_B.size(),
).unwrap())).unwrap();
dir
};
}

View file

@ -13,11 +13,11 @@ use tokio_stream::{wrappers::ReadDirStream, StreamExt};
use super::FuseDaemon;
use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST};
use crate::proto as castorepb;
use crate::proto::node::Node;
use crate::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
directoryservice::{
DirectoryNode, DirectoryService, FileNode, MemoryDirectoryService, Node, SymlinkNode,
},
fixtures,
};
@ -70,12 +70,15 @@ async fn populate_blob_a(
root_nodes.insert(
BLOB_A_NAME.into(),
Node::File(castorepb::FileNode {
name: BLOB_A_NAME.into(),
digest: fixtures::BLOB_A_DIGEST.clone().into(),
size: fixtures::BLOB_A.len() as u64,
executable: false,
}),
Node::File(
FileNode::new(
BLOB_A_NAME.into(),
fixtures::BLOB_A_DIGEST.clone(),
fixtures::BLOB_A.len() as u64,
false,
)
.unwrap(),
),
);
}
@ -91,12 +94,15 @@ async fn populate_blob_b(
root_nodes.insert(
BLOB_B_NAME.into(),
Node::File(castorepb::FileNode {
name: BLOB_B_NAME.into(),
digest: fixtures::BLOB_B_DIGEST.clone().into(),
size: fixtures::BLOB_B.len() as u64,
executable: false,
}),
Node::File(
FileNode::new(
BLOB_B_NAME.into(),
fixtures::BLOB_B_DIGEST.clone(),
fixtures::BLOB_B.len() as u64,
false,
)
.unwrap(),
),
);
}
@ -116,22 +122,22 @@ async fn populate_blob_helloworld(
root_nodes.insert(
HELLOWORLD_BLOB_NAME.into(),
Node::File(castorepb::FileNode {
name: HELLOWORLD_BLOB_NAME.into(),
digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(),
size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64,
executable: true,
}),
Node::File(
FileNode::new(
HELLOWORLD_BLOB_NAME.into(),
fixtures::HELLOWORLD_BLOB_DIGEST.clone(),
fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64,
true,
)
.unwrap(),
),
);
}
async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) {
root_nodes.insert(
SYMLINK_NAME.into(),
Node::Symlink(castorepb::SymlinkNode {
name: SYMLINK_NAME.into(),
target: BLOB_A_NAME.into(),
}),
Node::Symlink(SymlinkNode::new(SYMLINK_NAME.into(), BLOB_A_NAME.into()).unwrap()),
);
}
@ -140,10 +146,9 @@ async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) {
async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) {
root_nodes.insert(
SYMLINK_NAME2.into(),
Node::Symlink(castorepb::SymlinkNode {
name: SYMLINK_NAME2.into(),
target: "/nix/store/somewhereelse".into(),
}),
Node::Symlink(
SymlinkNode::new(SYMLINK_NAME2.into(), "/nix/store/somewhereelse".into()).unwrap(),
),
);
}
@ -167,11 +172,14 @@ async fn populate_directory_with_keep(
root_nodes.insert(
DIRECTORY_WITH_KEEP_NAME.into(),
castorepb::node::Node::Directory(castorepb::DirectoryNode {
name: DIRECTORY_WITH_KEEP_NAME.into(),
digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(),
size: fixtures::DIRECTORY_WITH_KEEP.size(),
}),
Node::Directory(
DirectoryNode::new(
DIRECTORY_WITH_KEEP_NAME.into(),
fixtures::DIRECTORY_WITH_KEEP.digest(),
fixtures::DIRECTORY_WITH_KEEP.size(),
)
.unwrap(),
),
);
}
@ -180,11 +188,14 @@ async fn populate_directory_with_keep(
async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) {
root_nodes.insert(
DIRECTORY_WITH_KEEP_NAME.into(),
castorepb::node::Node::Directory(castorepb::DirectoryNode {
name: DIRECTORY_WITH_KEEP_NAME.into(),
digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(),
size: fixtures::DIRECTORY_WITH_KEEP.size(),
}),
Node::Directory(
DirectoryNode::new(
DIRECTORY_WITH_KEEP_NAME.into(),
fixtures::DIRECTORY_WITH_KEEP.digest(),
fixtures::DIRECTORY_WITH_KEEP.size(),
)
.unwrap(),
),
);
}
@ -192,12 +203,15 @@ async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Byte
async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) {
root_nodes.insert(
BLOB_A_NAME.into(),
Node::File(castorepb::FileNode {
name: BLOB_A_NAME.into(),
digest: fixtures::BLOB_A_DIGEST.clone().into(),
size: fixtures::BLOB_A.len() as u64,
executable: false,
}),
Node::File(
FileNode::new(
BLOB_A_NAME.into(),
fixtures::BLOB_A_DIGEST.clone(),
fixtures::BLOB_A.len() as u64,
false,
)
.unwrap(),
),
);
}
@ -227,11 +241,14 @@ async fn populate_directory_complicated(
root_nodes.insert(
DIRECTORY_COMPLICATED_NAME.into(),
Node::Directory(castorepb::DirectoryNode {
name: DIRECTORY_COMPLICATED_NAME.into(),
digest: fixtures::DIRECTORY_COMPLICATED.digest().into(),
size: fixtures::DIRECTORY_COMPLICATED.size(),
}),
Node::Directory(
DirectoryNode::new(
DIRECTORY_COMPLICATED_NAME.into(),
fixtures::DIRECTORY_COMPLICATED.digest(),
fixtures::DIRECTORY_COMPLICATED.size(),
)
.unwrap(),
),
);
}

View file

@ -4,7 +4,7 @@ use std::time::Duration;
use bytes::Bytes;
use crate::proto as castorepb;
use crate::directoryservice::{NamedNode, Node};
use crate::B3Digest;
#[derive(Clone, Debug)]
@ -20,27 +20,24 @@ pub enum InodeData {
/// lookup and did fetch the data.
#[derive(Clone, Debug)]
pub enum DirectoryInodeData {
Sparse(B3Digest, u64), // digest, size
Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)]
Sparse(B3Digest, u64), // digest, size
Populated(B3Digest, Vec<(u64, Node)>), // [(child_inode, node)]
}
impl InodeData {
/// Constructs a new InodeData by consuming a [Node].
/// It splits off the orginal name, so it can be used later.
pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) {
pub fn from_node(node: &Node) -> (Self, Bytes) {
match node {
castorepb::node::Node::Directory(n) => (
Self::Directory(DirectoryInodeData::Sparse(
n.digest.try_into().unwrap(),
n.size,
)),
n.name,
Node::Directory(n) => (
Self::Directory(DirectoryInodeData::Sparse(n.digest().clone(), n.size())),
n.get_name().clone(),
),
castorepb::node::Node::File(n) => (
Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable),
n.name,
Node::File(n) => (
Self::Regular(n.digest().clone(), n.size(), n.executable()),
n.get_name().clone(),
),
castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name),
Node::Symlink(n) => (Self::Symlink(n.target().clone()), n.get_name().clone()),
}
}

View file

@ -15,11 +15,9 @@ use self::{
inode_tracker::InodeTracker,
inodes::{DirectoryInodeData, InodeData},
};
use crate::proto as castorepb;
use crate::{
blobservice::{BlobReader, BlobService},
directoryservice::DirectoryService,
proto::{node::Node, NamedNode},
directoryservice::{DirectoryService, NamedNode, Node},
B3Digest,
};
use bstr::ByteVec;
@ -198,13 +196,13 @@ where
let children = {
let mut inode_tracker = self.inode_tracker.write();
let children: Vec<(u64, castorepb::node::Node)> = directory
let children: Vec<(u64, Node)> = directory
.nodes()
.map(|child_node| {
let (inode_data, _) = InodeData::from_node(child_node.clone());
let (inode_data, _) = InodeData::from_node(child_node);
let child_ino = inode_tracker.put(inode_data);
(child_ino, child_node)
(child_ino, child_node.clone())
})
.collect();
@ -287,7 +285,7 @@ where
// insert the (sparse) inode data and register in
// self.root_nodes.
let (inode_data, name) = InodeData::from_node(root_node);
let (inode_data, name) = InodeData::from_node(&root_node);
let ino = inode_tracker.put(inode_data.clone());
root_nodes.insert(name, ino);
@ -468,7 +466,7 @@ where
io::Error::from_raw_os_error(libc::EIO)
})?;
let (inode_data, name) = InodeData::from_node(root_node);
let (inode_data, name) = InodeData::from_node(&root_node);
// obtain the inode, or allocate a new one.
let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
@ -498,7 +496,7 @@ where
Span::current().record("directory.digest", parent_digest.to_string());
for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
let (inode_data, name) = InodeData::from_node(child_node);
let (inode_data, name) = InodeData::from_node(&child_node);
// the second parameter will become the "offset" parameter on the next call.
let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
@ -555,7 +553,7 @@ where
io::Error::from_raw_os_error(libc::EPERM)
})?;
let (inode_data, name) = InodeData::from_node(root_node);
let (inode_data, name) = InodeData::from_node(&root_node);
// obtain the inode, or allocate a new one.
let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
@ -588,7 +586,7 @@ where
Span::current().record("directory.digest", parent_digest.to_string());
for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
let (inode_data, name) = InodeData::from_node(child_node);
let (inode_data, name) = InodeData::from_node(&child_node);
// the second parameter will become the "offset" parameter on the next call.
let written = add_entry(

View file

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use crate::{proto::node::Node, Error};
use crate::{directoryservice::Node, Error};
use bytes::Bytes;
use futures::stream::BoxStream;
use tonic::async_trait;

View file

@ -11,9 +11,8 @@ use tokio_tar::Archive;
use tracing::{instrument, warn, Level};
use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryService;
use crate::directoryservice::{DirectoryService, Node};
use crate::import::{ingest_entries, IngestionEntry, IngestionError};
use crate::proto::node::Node;
use super::blobs::{self, ConcurrentBlobUploader};

View file

@ -15,8 +15,7 @@ use walkdir::DirEntry;
use walkdir::WalkDir;
use crate::blobservice::BlobService;
use crate::directoryservice::DirectoryService;
use crate::proto::node::Node;
use crate::directoryservice::{DirectoryService, Node};
use crate::B3Digest;
use super::ingest_entries;

View file

@ -4,14 +4,14 @@
//! Specific implementations, such as ingesting from the filesystem, live in
//! child modules.
use crate::directoryservice::Directory;
use crate::directoryservice::DirectoryNode;
use crate::directoryservice::DirectoryPutter;
use crate::directoryservice::DirectoryService;
use crate::directoryservice::FileNode;
use crate::directoryservice::Node;
use crate::directoryservice::SymlinkNode;
use crate::path::{Path, PathBuf};
use crate::proto::node::Node;
use crate::proto::Directory;
use crate::proto::DirectoryNode;
use crate::proto::FileNode;
use crate::proto::SymlinkNode;
use crate::B3Digest;
use futures::{Stream, StreamExt};
use tracing::Level;
@ -98,27 +98,36 @@ where
IngestionError::UploadDirectoryError(entry.path().to_owned(), e)
})?;
Node::Directory(DirectoryNode {
name,
digest: directory_digest.into(),
size: directory_size,
})
Node::Directory(
DirectoryNode::new(name, directory_digest, directory_size).map_err(|e| {
IngestionError::UploadDirectoryError(
entry.path().to_owned(),
crate::Error::StorageError(e.to_string()),
)
})?,
)
}
IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode {
name,
target: target.to_owned().into(),
}),
IngestionEntry::Symlink { ref target, .. } => Node::Symlink(
SymlinkNode::new(name, target.to_owned().into()).map_err(|e| {
IngestionError::UploadDirectoryError(
entry.path().to_owned(),
crate::Error::StorageError(e.to_string()),
)
})?,
),
IngestionEntry::Regular {
size,
executable,
digest,
..
} => Node::File(FileNode {
name,
digest: digest.to_owned().into(),
size: *size,
executable: *executable,
}),
} => Node::File(
FileNode::new(name, digest.clone(), *size, *executable).map_err(|e| {
IngestionError::UploadDirectoryError(
entry.path().to_owned(),
crate::Error::StorageError(e.to_string()),
)
})?,
),
};
let parent = entry
@ -130,7 +139,16 @@ where
break node;
} else {
// record node in parent directory, creating a new [Directory] if not there yet.
directories.entry(parent.to_owned()).or_default().add(node);
directories
.entry(parent.to_owned())
.or_default()
.add(node)
.map_err(|e| {
IngestionError::UploadDirectoryError(
entry.path().to_owned(),
crate::Error::StorageError(e.to_string()),
)
})?;
}
};
@ -156,14 +174,7 @@ where
#[cfg(debug_assertions)]
{
if let Node::Directory(directory_node) = &root_node {
debug_assert_eq!(
root_directory_digest,
directory_node
.digest
.to_vec()
.try_into()
.expect("invalid digest len")
)
debug_assert_eq!(&root_directory_digest, directory_node.digest())
} else {
unreachable!("Tvix bug: directory putter initialized but no root directory node");
}
@ -208,9 +219,8 @@ impl IngestionEntry {
mod test {
use rstest::rstest;
use crate::directoryservice::{Directory, DirectoryNode, FileNode, Node, SymlinkNode};
use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
use crate::proto::node::Node;
use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode};
use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST};
use super::ingest_entries;
@ -223,18 +233,18 @@ mod test {
executable: true,
digest: DUMMY_DIGEST.clone(),
}],
Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true }
))]
Node::File(FileNode::new("foo".into(), DUMMY_DIGEST.clone(), 42, true).unwrap())
)]
#[case::single_symlink(vec![IngestionEntry::Symlink {
path: "foo".parse().unwrap(),
target: b"blub".into(),
}],
Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()})
Node::Symlink(SymlinkNode::new("foo".into(), "blub".into()).unwrap())
)]
#[case::single_dir(vec![IngestionEntry::Dir {
path: "foo".parse().unwrap(),
}],
Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()})
Node::Directory(DirectoryNode::new("foo".into(), Directory::default().digest(), Directory::default().size()).unwrap())
)]
#[case::dir_with_keep(vec![
IngestionEntry::Regular {
@ -247,7 +257,7 @@ mod test {
path: "foo".parse().unwrap(),
},
],
Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() })
Node::Directory(DirectoryNode::new("foo".into(), DIRECTORY_WITH_KEEP.digest(), DIRECTORY_WITH_KEEP.size()).unwrap())
)]
/// This is intentionally a bit unsorted, though it still satisfies all
/// requirements we have on the order of elements in the stream.
@ -275,7 +285,7 @@ mod test {
path: "blub".parse().unwrap(),
},
],
Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() })
Node::Directory(DirectoryNode::new("blub".into(), DIRECTORY_COMPLICATED.digest(), DIRECTORY_COMPLICATED.size()).unwrap())
)]
#[tokio::test]
async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {

View file

@ -18,7 +18,7 @@ pub mod proto;
pub mod tonic;
pub use digests::{B3Digest, B3_LEN};
pub use errors::Error;
pub use errors::{Error, ValidateDirectoryError, ValidateNodeError};
pub use hashing_reader::{B3HashingReader, HashingReader};
#[cfg(test)]

View file

@ -10,7 +10,7 @@ use std::{
use bstr::ByteSlice;
use crate::proto::validate_node_name;
use crate::directoryservice::Directory;
/// Represents a Path in the castore model.
/// These are always relative, and platform-independent, which distinguishes
@ -38,7 +38,7 @@ impl Path {
if !bytes.is_empty() {
// Ensure all components are valid castore node names.
for component in bytes.split_str(b"/") {
validate_node_name(component).ok()?;
Directory::validate_node_name(component).ok()?;
}
}
@ -211,7 +211,7 @@ impl PathBuf {
/// Adjoins `name` to self.
pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> {
validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?;
Directory::validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?;
if !self.inner.is_empty() {
self.inner.push(b'/');

View file

@ -1,7 +1,5 @@
use crate::directoryservice::DirectoryGraph;
use crate::directoryservice::LeavesToRootValidator;
use crate::proto;
use crate::{directoryservice::DirectoryService, B3Digest};
use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator};
use crate::{proto, B3Digest, ValidateDirectoryError};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use std::ops::Deref;
@ -58,13 +56,16 @@ where
Status::not_found(format!("directory {} not found", digest))
})?;
Box::pin(once(Ok(directory)))
Box::pin(once(Ok(directory.into())))
} else {
// If recursive was requested, traverse via get_recursive.
Box::pin(
self.directory_service.get_recursive(&digest).map_err(|e| {
tonic::Status::new(tonic::Code::Internal, e.to_string())
}),
self.directory_service
.get_recursive(&digest)
.map_ok(proto::Directory::from)
.map_err(|e| {
tonic::Status::new(tonic::Code::Internal, e.to_string())
}),
)
}
}))
@ -83,7 +84,9 @@ where
let mut validator = DirectoryGraph::<LeavesToRootValidator>::default();
while let Some(directory) = req_inner.message().await? {
validator
.add(directory)
.add(directory.try_into().map_err(|e: ValidateDirectoryError| {
tonic::Status::new(tonic::Code::Internal, e.to_string())
})?)
.map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
}

View file

@ -1,7 +1,4 @@
#![allow(non_snake_case)]
// https://github.com/hyperium/tonic/issues/1056
use bstr::ByteSlice;
use std::{collections::HashSet, iter::Peekable, str};
use std::str;
use prost::Message;
@ -11,7 +8,8 @@ mod grpc_directoryservice_wrapper;
pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
use crate::{B3Digest, B3_LEN};
use crate::directoryservice::NamedNode;
use crate::{B3Digest, ValidateDirectoryError, ValidateNodeError};
tonic::include_proto!("tvix.castore.v1");
@ -24,38 +22,6 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix
#[cfg(test)]
mod tests;
/// Errors that can occur during the validation of [Directory] messages.
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
pub enum ValidateDirectoryError {
/// Elements are not in sorted order
#[error("{:?} is not sorted", .0.as_bstr())]
WrongSorting(Vec<u8>),
/// Multiple elements with the same name encountered
#[error("{:?} is a duplicate name", .0.as_bstr())]
DuplicateName(Vec<u8>),
/// Invalid node
#[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())]
InvalidNode(Vec<u8>, ValidateNodeError),
#[error("Total size exceeds u32::MAX")]
SizeOverflow,
}
/// Errors that occur during Node validation
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
pub enum ValidateNodeError {
#[error("No node set")]
NoNodeSet,
/// Invalid digest length encountered
#[error("Invalid Digest length: {0}")]
InvalidDigestLen(usize),
/// Invalid name encountered
#[error("Invalid name: {}", .0.as_bstr())]
InvalidName(Vec<u8>),
/// Invalid symlink target
#[error("Invalid symlink target: {}", .0.as_bstr())]
InvalidSymlinkTarget(Vec<u8>),
}
/// Errors that occur during StatBlobResponse validation
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
pub enum ValidateStatBlobResponseError {
@ -64,186 +30,6 @@ pub enum ValidateStatBlobResponseError {
InvalidDigestLen(usize, usize),
}
/// Checks a Node name for validity as an intermediate node.
/// We disallow slashes, null bytes, '.', '..' and the empty string.
pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
if name.is_empty()
|| name == b".."
|| name == b"."
|| name.contains(&0x00)
|| name.contains(&b'/')
{
Err(ValidateNodeError::InvalidName(name.to_owned()))
} else {
Ok(())
}
}
/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode]
/// and [node::Node], so we can ask all of them for the name easily.
pub trait NamedNode {
fn get_name(&self) -> &[u8];
}
impl NamedNode for &FileNode {
fn get_name(&self) -> &[u8] {
&self.name
}
}
impl NamedNode for &DirectoryNode {
fn get_name(&self) -> &[u8] {
&self.name
}
}
impl NamedNode for &SymlinkNode {
fn get_name(&self) -> &[u8] {
&self.name
}
}
impl NamedNode for node::Node {
fn get_name(&self) -> &[u8] {
match self {
node::Node::File(node_file) => &node_file.name,
node::Node::Directory(node_directory) => &node_directory.name,
node::Node::Symlink(node_symlink) => &node_symlink.name,
}
}
}
impl Node {
/// Ensures the node has a valid enum kind (is Some), and passes its
// per-enum validation.
// The inner root node is returned for easier consumption.
pub fn validate(&self) -> Result<&node::Node, ValidateNodeError> {
if let Some(node) = self.node.as_ref() {
node.validate()?;
Ok(node)
} else {
Err(ValidateNodeError::NoNodeSet)
}
}
}
impl node::Node {
/// Returns the node with a new name.
pub fn rename(self, name: bytes::Bytes) -> Self {
match self {
node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }),
node::Node::File(n) => node::Node::File(FileNode { name, ..n }),
node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }),
}
}
/// Ensures the node has a valid name, and checks the type-specific fields too.
pub fn validate(&self) -> Result<(), ValidateNodeError> {
match self {
// for a directory root node, ensure the digest has the appropriate size.
node::Node::Directory(directory_node) => {
if directory_node.digest.len() != B3_LEN {
Err(ValidateNodeError::InvalidDigestLen(
directory_node.digest.len(),
))?;
}
validate_node_name(&directory_node.name)
}
// for a file root node, ensure the digest has the appropriate size.
node::Node::File(file_node) => {
if file_node.digest.len() != B3_LEN {
Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?;
}
validate_node_name(&file_node.name)
}
// ensure the symlink target is not empty and doesn't contain null bytes.
node::Node::Symlink(symlink_node) => {
if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') {
Err(ValidateNodeError::InvalidSymlinkTarget(
symlink_node.target.to_vec(),
))?;
}
validate_node_name(&symlink_node.name)
}
}
}
}
impl PartialOrd for node::Node {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for node::Node {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.get_name().cmp(other.get_name())
}
}
impl PartialOrd for FileNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for FileNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.get_name().cmp(other.get_name())
}
}
impl PartialOrd for SymlinkNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SymlinkNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.get_name().cmp(other.get_name())
}
}
impl PartialOrd for DirectoryNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for DirectoryNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.get_name().cmp(other.get_name())
}
}
/// Accepts a name, and a mutable reference to the previous name.
/// If the passed name is larger than the previous one, the reference is updated.
/// If it's not, an error is returned.
fn update_if_lt_prev<'n>(
prev_name: &mut &'n [u8],
name: &'n [u8],
) -> Result<(), ValidateDirectoryError> {
if *name < **prev_name {
return Err(ValidateDirectoryError::WrongSorting(name.to_vec()));
}
*prev_name = name;
Ok(())
}
/// Inserts the given name into a HashSet if it's not already in there.
/// If it is, an error is returned.
fn insert_once<'n>(
seen_names: &mut HashSet<&'n [u8]>,
name: &'n [u8],
) -> Result<(), ValidateDirectoryError> {
if seen_names.get(name).is_some() {
return Err(ValidateDirectoryError::DuplicateName(name.to_vec()));
}
seen_names.insert(name);
Ok(())
}
fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
}
@ -280,116 +66,213 @@ impl Directory {
.as_bytes()
.into()
}
}
/// validate checks the directory for invalid data, such as:
/// - violations of name restrictions
/// - invalid digest lengths
/// - not properly sorted lists
/// - duplicate names in the three lists
pub fn validate(&self) -> Result<(), ValidateDirectoryError> {
let mut seen_names: HashSet<&[u8]> = HashSet::new();
let mut last_directory_name: &[u8] = b"";
let mut last_file_name: &[u8] = b"";
let mut last_symlink_name: &[u8] = b"";
// check directories
for directory_node in &self.directories {
node::Node::Directory(directory_node.clone())
.validate()
.map_err(|e| {
ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e)
})?;
update_if_lt_prev(&mut last_directory_name, &directory_node.name)?;
insert_once(&mut seen_names, &directory_node.name)?;
}
// check files
for file_node in &self.files {
node::Node::File(file_node.clone())
.validate()
.map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?;
update_if_lt_prev(&mut last_file_name, &file_node.name)?;
insert_once(&mut seen_names, &file_node.name)?;
}
// check symlinks
for symlink_node in &self.symlinks {
node::Node::Symlink(symlink_node.clone())
.validate()
.map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?;
update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?;
insert_once(&mut seen_names, &symlink_node.name)?;
}
self.size_checked()
.ok_or(ValidateDirectoryError::SizeOverflow)?;
Ok(())
/// Accepts a name, and a mutable reference to the previous name.
/// If the passed name is larger than the previous one, the reference is updated.
/// If it's not, an error is returned.
fn update_if_lt_prev<'n>(
prev_name: &mut &'n [u8],
name: &'n [u8],
) -> Result<(), ValidateDirectoryError> {
if *name < **prev_name {
return Err(ValidateDirectoryError::WrongSorting(name.to_vec()));
}
*prev_name = name;
Ok(())
}
/// Allows iterating over all three nodes ([DirectoryNode], [FileNode],
/// [SymlinkNode]) in an ordered fashion, as long as the individual lists
/// are sorted (which can be checked by the [Directory::validate]).
pub fn nodes(&self) -> DirectoryNodesIterator {
return DirectoryNodesIterator {
i_directories: self.directories.iter().peekable(),
i_files: self.files.iter().peekable(),
i_symlinks: self.symlinks.iter().peekable(),
};
impl TryFrom<&node::Node> for crate::directoryservice::Node {
type Error = ValidateNodeError;
fn try_from(node: &node::Node) -> Result<crate::directoryservice::Node, ValidateNodeError> {
Ok(match node {
node::Node::Directory(n) => crate::directoryservice::Node::Directory(n.try_into()?),
node::Node::File(n) => crate::directoryservice::Node::File(n.try_into()?),
node::Node::Symlink(n) => crate::directoryservice::Node::Symlink(n.try_into()?),
})
}
}
/// Adds the specified [node::Node] to the [Directory], preserving sorted entries.
/// This assumes the [Directory] to be sorted prior to adding the node.
///
/// Inserting an element that already exists with the same name in the directory is not
/// supported.
pub fn add(&mut self, node: node::Node) {
debug_assert!(
!self.files.iter().any(|x| x.get_name() == node.get_name()),
"name already exists in files"
);
debug_assert!(
!self
.directories
.iter()
.any(|x| x.get_name() == node.get_name()),
"name already exists in directories"
);
debug_assert!(
!self
.symlinks
.iter()
.any(|x| x.get_name() == node.get_name()),
"name already exists in symlinks"
);
impl TryFrom<&Node> for crate::directoryservice::Node {
type Error = ValidateNodeError;
fn try_from(node: &Node) -> Result<crate::directoryservice::Node, ValidateNodeError> {
match node {
node::Node::File(node) => {
let pos = self
.files
.binary_search(&node)
.expect_err("Tvix bug: dir entry with name already exists");
self.files.insert(pos, node);
}
node::Node::Directory(node) => {
let pos = self
.directories
.binary_search(&node)
.expect_err("Tvix bug: dir entry with name already exists");
self.directories.insert(pos, node);
}
node::Node::Symlink(node) => {
let pos = self
.symlinks
.binary_search(&node)
.expect_err("Tvix bug: dir entry with name already exists");
self.symlinks.insert(pos, node);
Node { node: None } => Err(ValidateNodeError::NoNodeSet),
Node { node: Some(node) } => node.try_into(),
}
}
}
impl TryFrom<&DirectoryNode> for crate::directoryservice::DirectoryNode {
type Error = ValidateNodeError;
fn try_from(
node: &DirectoryNode,
) -> Result<crate::directoryservice::DirectoryNode, ValidateNodeError> {
crate::directoryservice::DirectoryNode::new(
node.name.clone(),
node.digest.clone().try_into()?,
node.size,
)
}
}
impl TryFrom<&SymlinkNode> for crate::directoryservice::SymlinkNode {
type Error = ValidateNodeError;
fn try_from(
node: &SymlinkNode,
) -> Result<crate::directoryservice::SymlinkNode, ValidateNodeError> {
crate::directoryservice::SymlinkNode::new(node.name.clone(), node.target.clone())
}
}
impl TryFrom<&FileNode> for crate::directoryservice::FileNode {
type Error = ValidateNodeError;
fn try_from(node: &FileNode) -> Result<crate::directoryservice::FileNode, ValidateNodeError> {
crate::directoryservice::FileNode::new(
node.name.clone(),
node.digest.clone().try_into()?,
node.size,
node.executable,
)
}
}
impl TryFrom<Directory> for crate::directoryservice::Directory {
type Error = ValidateDirectoryError;
fn try_from(
directory: Directory,
) -> Result<crate::directoryservice::Directory, ValidateDirectoryError> {
(&directory).try_into()
}
}
impl TryFrom<&Directory> for crate::directoryservice::Directory {
type Error = ValidateDirectoryError;
fn try_from(
directory: &Directory,
) -> Result<crate::directoryservice::Directory, ValidateDirectoryError> {
let mut dir = crate::directoryservice::Directory::new();
let mut last_file_name: &[u8] = b"";
for file in directory.files.iter().map(move |file| {
update_if_lt_prev(&mut last_file_name, &file.name).map(|()| file.clone())
}) {
let file = file?;
dir.add(crate::directoryservice::Node::File(
(&file)
.try_into()
.map_err(|e| ValidateDirectoryError::InvalidNode(file.name.into(), e))?,
))?;
}
let mut last_directory_name: &[u8] = b"";
for directory in directory.directories.iter().map(move |directory| {
update_if_lt_prev(&mut last_directory_name, &directory.name).map(|()| directory.clone())
}) {
let directory = directory?;
dir.add(crate::directoryservice::Node::Directory(
(&directory)
.try_into()
.map_err(|e| ValidateDirectoryError::InvalidNode(directory.name.into(), e))?,
))?;
}
let mut last_symlink_name: &[u8] = b"";
for symlink in directory.symlinks.iter().map(move |symlink| {
update_if_lt_prev(&mut last_symlink_name, &symlink.name).map(|()| symlink.clone())
}) {
let symlink = symlink?;
dir.add(crate::directoryservice::Node::Symlink(
(&symlink)
.try_into()
.map_err(|e| ValidateDirectoryError::InvalidNode(symlink.name.into(), e))?,
))?;
}
Ok(dir)
}
}
impl From<&crate::directoryservice::Node> for node::Node {
fn from(node: &crate::directoryservice::Node) -> node::Node {
match node {
crate::directoryservice::Node::Directory(n) => node::Node::Directory(n.into()),
crate::directoryservice::Node::File(n) => node::Node::File(n.into()),
crate::directoryservice::Node::Symlink(n) => node::Node::Symlink(n.into()),
}
}
}
impl From<&crate::directoryservice::Node> for Node {
fn from(node: &crate::directoryservice::Node) -> Node {
Node {
node: Some(node.into()),
}
}
}
impl From<&crate::directoryservice::DirectoryNode> for DirectoryNode {
fn from(node: &crate::directoryservice::DirectoryNode) -> DirectoryNode {
DirectoryNode {
digest: node.digest().clone().into(),
size: node.size(),
name: node.get_name().clone(),
}
}
}
impl From<&crate::directoryservice::FileNode> for FileNode {
fn from(node: &crate::directoryservice::FileNode) -> FileNode {
FileNode {
digest: node.digest().clone().into(),
size: node.size(),
name: node.get_name().clone(),
executable: node.executable(),
}
}
}
impl From<&crate::directoryservice::SymlinkNode> for SymlinkNode {
fn from(node: &crate::directoryservice::SymlinkNode) -> SymlinkNode {
SymlinkNode {
name: node.get_name().clone(),
target: node.target().clone(),
}
}
}
impl From<crate::directoryservice::Directory> for Directory {
fn from(directory: crate::directoryservice::Directory) -> Directory {
(&directory).into()
}
}
impl From<&crate::directoryservice::Directory> for Directory {
fn from(directory: &crate::directoryservice::Directory) -> Directory {
let mut directories = vec![];
let mut files = vec![];
let mut symlinks = vec![];
for node in directory.nodes() {
match node {
crate::directoryservice::Node::File(n) => {
files.push(n.into());
}
crate::directoryservice::Node::Directory(n) => {
directories.push(n.into());
}
crate::directoryservice::Node::Symlink(n) => {
symlinks.push(n.into());
}
}
}
Directory {
directories,
files,
symlinks,
}
}
}
@ -409,65 +292,3 @@ impl StatBlobResponse {
Ok(())
}
}
/// Struct to hold the state of an iterator over all nodes of a Directory.
///
/// Internally, this keeps peekable Iterators over all three lists of a
/// Directory message.
pub struct DirectoryNodesIterator<'a> {
// directory: &Directory,
i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>,
i_files: Peekable<std::slice::Iter<'a, FileNode>>,
i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>,
}
/// looks at two elements implementing NamedNode, and returns true if "left
/// is smaller / comes first".
///
/// Some(_) is preferred over None.
fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool {
match left {
// if left is None, right always wins
None => false,
Some(left_inner) => {
// left is Some.
match right {
// left is Some, right is None - left wins.
None => true,
Some(right_inner) => {
// both are Some - compare the name.
return left_inner.get_name() < right_inner.get_name();
}
}
}
}
}
impl Iterator for DirectoryNodesIterator<'_> {
type Item = node::Node;
// next returns the next node in the Directory.
// we peek at all three internal iterators, and pick the one with the
// smallest name, to ensure lexicographical ordering.
// The individual lists are already known to be sorted.
fn next(&mut self) -> Option<Self::Item> {
if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) {
// i_directories is still in the game, compare with symlinks
if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) {
self.i_directories
.next()
.cloned()
.map(node::Node::Directory)
} else {
self.i_symlinks.next().cloned().map(node::Node::Symlink)
}
} else {
// i_files is still in the game, compare with symlinks
if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) {
self.i_files.next().cloned().map(node::Node::File)
} else {
self.i_symlinks.next().cloned().map(node::Node::Symlink)
}
}
}
}

View file

@ -1,7 +1,5 @@
use crate::proto::{
node, Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError,
ValidateNodeError,
};
use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError};
use crate::ValidateNodeError;
use hex_literal::hex;
@ -149,7 +147,7 @@ fn digest() {
#[test]
fn validate_empty() {
let d = Directory::default();
assert_eq!(d.validate(), Ok(()));
assert!(crate::directoryservice::Directory::try_from(d).is_ok());
}
#[test]
@ -163,7 +161,7 @@ fn validate_invalid_names() {
}],
..Default::default()
};
match d.validate().expect_err("must fail") {
match crate::directoryservice::Directory::try_from(d).expect_err("must fail") {
ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
assert_eq!(n, b"")
}
@ -180,7 +178,7 @@ fn validate_invalid_names() {
}],
..Default::default()
};
match d.validate().expect_err("must fail") {
match crate::directoryservice::Directory::try_from(d).expect_err("must fail") {
ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
assert_eq!(n, b".")
}
@ -198,7 +196,7 @@ fn validate_invalid_names() {
}],
..Default::default()
};
match d.validate().expect_err("must fail") {
match crate::directoryservice::Directory::try_from(d).expect_err("must fail") {
ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
assert_eq!(n, b"..")
}
@ -214,7 +212,7 @@ fn validate_invalid_names() {
}],
..Default::default()
};
match d.validate().expect_err("must fail") {
match crate::directoryservice::Directory::try_from(d).expect_err("must fail") {
ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
assert_eq!(n, b"\x00")
}
@ -230,7 +228,7 @@ fn validate_invalid_names() {
}],
..Default::default()
};
match d.validate().expect_err("must fail") {
match crate::directoryservice::Directory::try_from(d).expect_err("must fail") {
ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
assert_eq!(n, b"foo/bar")
}
@ -249,7 +247,7 @@ fn validate_invalid_digest() {
}],
..Default::default()
};
match d.validate().expect_err("must fail") {
match crate::directoryservice::Directory::try_from(d).expect_err("must fail") {
ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => {
assert_eq!(n, 2)
}
@ -276,7 +274,7 @@ fn validate_sorting() {
],
..Default::default()
};
match d.validate().expect_err("must fail") {
match crate::directoryservice::Directory::try_from(d).expect_err("must fail") {
ValidateDirectoryError::WrongSorting(s) => {
assert_eq!(s, b"a");
}
@ -301,7 +299,7 @@ fn validate_sorting() {
],
..Default::default()
};
match d.validate().expect_err("must fail") {
match crate::directoryservice::Directory::try_from(d).expect_err("must fail") {
ValidateDirectoryError::DuplicateName(s) => {
assert_eq!(s, b"a");
}
@ -327,7 +325,7 @@ fn validate_sorting() {
..Default::default()
};
d.validate().expect("validate shouldn't error");
crate::directoryservice::Directory::try_from(d).expect("validate shouldn't error");
}
// [b, c] and [a] are both properly sorted.
@ -352,101 +350,6 @@ fn validate_sorting() {
..Default::default()
};
d.validate().expect("validate shouldn't error");
crate::directoryservice::Directory::try_from(d).expect("validate shouldn't error");
}
}
#[test]
fn validate_overflow() {
let d = Directory {
directories: vec![DirectoryNode {
name: "foo".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: u64::MAX,
}],
..Default::default()
};
match d.validate().expect_err("must fail") {
ValidateDirectoryError::SizeOverflow => {}
_ => panic!("unexpected error"),
}
}
#[test]
fn add_nodes_to_directory() {
let mut d = Directory {
..Default::default()
};
d.add(node::Node::Directory(DirectoryNode {
name: "b".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 1,
}));
d.add(node::Node::Directory(DirectoryNode {
name: "a".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 1,
}));
d.add(node::Node::Directory(DirectoryNode {
name: "z".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 1,
}));
d.add(node::Node::File(FileNode {
name: "f".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 1,
executable: true,
}));
d.add(node::Node::File(FileNode {
name: "c".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 1,
executable: true,
}));
d.add(node::Node::File(FileNode {
name: "g".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 1,
executable: true,
}));
d.add(node::Node::Symlink(SymlinkNode {
name: "t".into(),
target: "a".into(),
}));
d.add(node::Node::Symlink(SymlinkNode {
name: "o".into(),
target: "a".into(),
}));
d.add(node::Node::Symlink(SymlinkNode {
name: "e".into(),
target: "a".into(),
}));
d.validate().expect("directory should be valid");
}
#[test]
#[cfg_attr(not(debug_assertions), ignore)]
#[should_panic = "name already exists in directories"]
fn add_duplicate_node_to_directory_panics() {
let mut d = Directory {
..Default::default()
};
d.add(node::Node::Directory(DirectoryNode {
name: "a".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 1,
}));
d.add(node::Node::File(FileNode {
name: "a".into(),
digest: DUMMY_DIGEST.to_vec().into(),
size: 1,
executable: true,
}));
}

View file

@ -1,78 +0,0 @@
use crate::proto::Directory;
use crate::proto::DirectoryNode;
use crate::proto::FileNode;
use crate::proto::NamedNode;
use crate::proto::SymlinkNode;
#[test]
fn iterator() {
let d = Directory {
directories: vec![
DirectoryNode {
name: "c".into(),
..DirectoryNode::default()
},
DirectoryNode {
name: "d".into(),
..DirectoryNode::default()
},
DirectoryNode {
name: "h".into(),
..DirectoryNode::default()
},
DirectoryNode {
name: "l".into(),
..DirectoryNode::default()
},
],
files: vec![
FileNode {
name: "b".into(),
..FileNode::default()
},
FileNode {
name: "e".into(),
..FileNode::default()
},
FileNode {
name: "g".into(),
..FileNode::default()
},
FileNode {
name: "j".into(),
..FileNode::default()
},
],
symlinks: vec![
SymlinkNode {
name: "a".into(),
..SymlinkNode::default()
},
SymlinkNode {
name: "f".into(),
..SymlinkNode::default()
},
SymlinkNode {
name: "i".into(),
..SymlinkNode::default()
},
SymlinkNode {
name: "k".into(),
..SymlinkNode::default()
},
],
};
// We keep this strings here and convert to string to make the comparison
// less messy.
let mut node_names: Vec<String> = vec![];
for node in d.nodes() {
node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap());
}
assert_eq!(
vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"],
node_names
);
}

View file

@ -1,2 +1 @@
mod directory;
mod directory_nodes_iterator;

View file

@ -1,5 +1,6 @@
use crate::blobservice::{self, BlobService};
use crate::directoryservice;
use crate::directoryservice::{DirectoryNode, Node, SymlinkNode};
use crate::fixtures::*;
use crate::import::fs::ingest_path;
use crate::proto;
@ -33,10 +34,9 @@ async fn symlink() {
.expect("must succeed");
assert_eq!(
proto::node::Node::Symlink(proto::SymlinkNode {
name: "doesntmatter".into(),
target: "/nix/store/somewhereelse".into(),
}),
Node::Symlink(
SymlinkNode::new("doesntmatter".into(), "/nix/store/somewhereelse".into(),).unwrap()
),
root_node,
)
}
@ -65,7 +65,7 @@ async fn single_file() {
size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
executable: false,
}),
root_node,
(&root_node).into(),
);
// ensure the blob has been uploaded
@ -95,17 +95,20 @@ async fn complicated() {
// ensure root_node matched expectations
assert_eq!(
proto::node::Node::Directory(proto::DirectoryNode {
name: tmpdir
.path()
.file_name()
.unwrap()
.as_bytes()
.to_owned()
.into(),
digest: DIRECTORY_COMPLICATED.digest().into(),
size: DIRECTORY_COMPLICATED.size(),
}),
Node::Directory(
DirectoryNode::new(
tmpdir
.path()
.file_name()
.unwrap()
.as_bytes()
.to_owned()
.into(),
DIRECTORY_COMPLICATED.digest().clone(),
DIRECTORY_COMPLICATED.size(),
)
.unwrap()
),
root_node,
);

View file

@ -179,9 +179,7 @@ pub(crate) mod derivation_builtins {
use nix_compat::nixhash::CAHash;
use nix_compat::store_path::{build_ca_path, hash_placeholder};
use sha2::Sha256;
use tvix_castore::proto as castorepb;
use tvix_castore::proto::node::Node;
use tvix_castore::proto::FileNode;
use tvix_castore::directoryservice::{FileNode, Node};
use tvix_eval::generators::Gen;
use tvix_eval::{NixContext, NixContextElement, NixString};
use tvix_store::proto::{NarInfo, PathInfo};
@ -579,12 +577,10 @@ pub(crate) mod derivation_builtins {
})
.map_err(DerivationError::InvalidDerivation)?;
let root_node = Node::File(FileNode {
name: store_path.to_string().into(),
digest: blob_digest.into(),
size: blob_size,
executable: false,
});
let root_node = Node::File(
FileNode::new(store_path.to_string().into(), blob_digest, blob_size, false)
.map_err(|e| ErrorKind::TvixError(Rc::new(e)))?,
);
// calculate the nar hash
let (nar_size, nar_sha256) = state
@ -604,9 +600,7 @@ pub(crate) mod derivation_builtins {
state
.path_info_service
.put(PathInfo {
node: Some(castorepb::Node {
node: Some(root_node),
}),
node: Some((&root_node).into()),
references: reference_paths
.iter()
.map(|x| bytes::Bytes::copy_from_slice(x.digest()))

View file

@ -2,6 +2,8 @@
use crate::builtins::errors::ImportError;
use std::path::Path;
use tvix_castore::directoryservice::FileNode;
use tvix_castore::directoryservice::Node;
use tvix_castore::import::ingest_entries;
use tvix_eval::{
builtin_macros::builtins,
@ -16,7 +18,7 @@ async fn filtered_ingest(
co: GenCo,
path: &Path,
filter: Option<&Value>,
) -> Result<tvix_castore::proto::node::Node, ErrorKind> {
) -> Result<Node, ErrorKind> {
let mut entries: Vec<walkdir::DirEntry> = vec![];
let mut it = walkdir::WalkDir::new(path)
.follow_links(false)
@ -114,8 +116,6 @@ mod import_builtins {
use nix_compat::store_path::StorePath;
use sha2::Digest;
use tokio::io::AsyncWriteExt;
use tvix_castore::proto::node::Node;
use tvix_castore::proto::FileNode;
use tvix_eval::builtins::coerce_value_to_path;
use tvix_eval::generators::Gen;
use tvix_eval::{generators::GenCo, ErrorKind, Value};
@ -214,13 +214,16 @@ mod import_builtins {
.tokio_handle
.block_on(async { blob_writer.close().await })?;
let root_node = Node::File(FileNode {
// The name gets set further down, while constructing the PathInfo.
name: "".into(),
digest: blob_digest.into(),
size: blob_size,
executable: false,
});
let root_node = Node::File(
FileNode::new(
// The name gets set further down, while constructing the PathInfo.
"".into(),
blob_digest,
blob_size,
false,
)
.map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?,
);
let ca_hash = if recursive_ingestion {
let (_nar_size, nar_sha256) = state

View file

@ -12,8 +12,8 @@ use tracing::{instrument, warn, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tvix_castore::{
blobservice::BlobService,
directoryservice::DirectoryService,
proto::{node::Node, FileNode},
directoryservice::{DirectoryService, FileNode, Node},
ValidateNodeError,
};
use tvix_store::{nar::NarCalculationService, pathinfoservice::PathInfoService, proto::PathInfo};
use url::Url;
@ -331,12 +331,10 @@ where
// Construct and return the FileNode describing the downloaded contents.
Ok((
Node::File(FileNode {
name: vec![].into(),
digest: blob_writer.close().await?.into(),
size: blob_size,
executable: false,
}),
Node::File(
FileNode::new(vec![].into(), blob_writer.close().await?, blob_size, false)
.map_err(|e| FetcherError::Io(std::io::Error::other(e.to_string())))?,
),
CAHash::Flat(actual_hash),
blob_size,
))
@ -531,12 +529,13 @@ where
// Construct and return the FileNode describing the downloaded contents,
// make it executable.
let root_node = Node::File(FileNode {
name: vec![].into(),
digest: blob_digest.into(),
size: file_size,
executable: true,
});
let root_node = Node::File(
FileNode::new(vec![].into(), blob_digest, file_size, true).map_err(
|e: ValidateNodeError| {
FetcherError::Io(std::io::Error::other(e.to_string()))
},
)?,
);
Ok((root_node, CAHash::Nar(actual_hash), file_size))
}
@ -580,7 +579,7 @@ where
// Construct the PathInfo and persist it.
let path_info = PathInfo {
node: Some(tvix_castore::proto::Node { node: Some(node) }),
node: Some((&node).into()),
references: vec![],
narinfo: Some(tvix_store::proto::NarInfo {
nar_size,
@ -598,7 +597,14 @@ where
.await
.map_err(|e| FetcherError::Io(e.into()))?;
Ok((store_path, path_info.node.unwrap().node.unwrap()))
Ok((
store_path,
(&path_info.node.unwrap().node.unwrap())
.try_into()
.map_err(|e: ValidateNodeError| {
FetcherError::Io(std::io::Error::other(e.to_string()))
})?,
))
}
}

View file

@ -10,7 +10,7 @@ use tvix_build::proto::{
build_request::{AdditionalFile, BuildConstraints, EnvVar},
BuildRequest,
};
use tvix_castore::proto::{self, node::Node};
use tvix_castore::directoryservice::Node;
/// These are the environment variables that Nix sets in its sandbox for every
/// build.
@ -109,10 +109,7 @@ pub(crate) fn derivation_to_build_request(
.into_iter()
.map(|(key, value)| EnvVar { key, value })
.collect(),
inputs: inputs
.into_iter()
.map(|n| proto::Node { node: Some(n) })
.collect(),
inputs: inputs.iter().map(Into::into).collect(),
inputs_dir: nix_compat::store_path::STORE_DIR[1..].into(),
constraints,
working_dir: "build".into(),
@ -200,10 +197,8 @@ mod test {
build_request::{AdditionalFile, BuildConstraints, EnvVar},
BuildRequest,
};
use tvix_castore::{
fixtures::DUMMY_DIGEST,
proto::{self, node::Node, DirectoryNode},
};
use tvix_castore::directoryservice::{DirectoryNode, Node};
use tvix_castore::fixtures::DUMMY_DIGEST;
use crate::tvix_build::NIX_ENVIRONMENT_VARS;
@ -211,11 +206,14 @@ mod test {
use lazy_static::lazy_static;
lazy_static! {
static ref INPUT_NODE_FOO: Node = Node::Directory(DirectoryNode {
name: Bytes::from("mp57d33657rf34lzvlbpfa1gjfv5gmpg-bar"),
digest: DUMMY_DIGEST.clone().into(),
size: 42,
});
static ref INPUT_NODE_FOO: Node = Node::Directory(
DirectoryNode::new(
Bytes::from("mp57d33657rf34lzvlbpfa1gjfv5gmpg-bar"),
DUMMY_DIGEST.clone(),
42,
)
.unwrap()
);
}
#[test]
@ -263,9 +261,7 @@ mod test {
command_args: vec![":".into()],
outputs: vec!["nix/store/fhaj6gmwns62s6ypkcldbaj2ybvkhx3p-foo".into()],
environment_vars: expected_environment_vars,
inputs: vec![proto::Node {
node: Some(INPUT_NODE_FOO.clone())
}],
inputs: vec![(&*INPUT_NODE_FOO).into()],
inputs_dir: "nix/store".into(),
constraints: Some(BuildConstraints {
system: derivation.system.clone(),

View file

@ -15,15 +15,12 @@ use tokio_util::io::SyncIoBridge;
use tracing::{error, instrument, warn, Level, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tvix_build::buildservice::BuildService;
use tvix_castore::proto::node::Node;
use tvix_eval::{EvalIO, FileType, StdIO};
use tvix_store::nar::NarCalculationService;
use tvix_castore::{
blobservice::BlobService,
directoryservice::{self, DirectoryService},
proto::NamedNode,
B3Digest,
directoryservice::{self, DirectoryService, NamedNode, Node},
};
use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
@ -122,7 +119,12 @@ impl TvixStoreIO {
.await?
{
// if we have a PathInfo, we know there will be a root_node (due to validation)
Some(path_info) => path_info.node.expect("no node").node.expect("no node"),
Some(path_info) => path_info
.node
.as_ref()
.expect("no node")
.try_into()
.expect("invalid node"),
// If there's no PathInfo found, this normally means we have to
// trigger the build (and insert into PathInfoService, after
// reference scanning).
@ -284,19 +286,17 @@ impl TvixStoreIO {
// For each output, insert a PathInfo.
for output in &build_result.outputs {
let root_node = output.node.as_ref().expect("invalid root node");
let root_node = output.try_into().expect("invalid root node");
// calculate the nar representation
let (nar_size, nar_sha256) = self
.nar_calculation_service
.calculate_nar(root_node)
.calculate_nar(&root_node)
.await?;
// assemble the PathInfo to persist
let path_info = PathInfo {
node: Some(tvix_castore::proto::Node {
node: Some(root_node.clone()),
}),
node: Some((&root_node).into()),
references: vec![], // TODO: refscan
narinfo: Some(tvix_store::proto::NarInfo {
nar_size,
@ -332,13 +332,11 @@ impl TvixStoreIO {
build_result
.outputs
.into_iter()
.map(|output_node| Node::try_from(&output_node).expect("invalid node"))
.find(|output_node| {
output_node.node.as_ref().expect("invalid node").get_name()
== store_path.to_string().as_bytes()
output_node.get_name() == store_path.to_string().as_bytes()
})
.expect("build didn't produce the store path")
.node
.expect("invalid node")
}
}
}
@ -460,20 +458,12 @@ impl EvalIO for TvixStoreIO {
))
}
Node::File(file_node) => {
let digest: B3Digest =
file_node.digest.clone().try_into().map_err(|_e| {
error!(
file_node = ?file_node,
"invalid digest"
);
io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid digest length in file node: {:?}", file_node),
)
})?;
self.tokio_handle.block_on(async {
let resp = self.blob_service.as_ref().open_read(&digest).await?;
let resp = self
.blob_service
.as_ref()
.open_read(file_node.digest())
.await?;
match resp {
Some(blob_reader) => {
// The VM Response needs a sync [std::io::Reader].
@ -482,12 +472,12 @@ impl EvalIO for TvixStoreIO {
}
None => {
error!(
blob.digest = %digest,
blob.digest = %file_node.digest(),
"blob not found",
);
Err(io::Error::new(
io::ErrorKind::NotFound,
format!("blob {} not found", &digest),
format!("blob {} not found", &file_node.digest()),
))
}
}
@ -543,16 +533,7 @@ impl EvalIO for TvixStoreIO {
match node {
Node::Directory(directory_node) => {
// fetch the Directory itself.
let digest: B3Digest =
directory_node.digest.clone().try_into().map_err(|_e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!(
"invalid digest length in directory node: {:?}",
directory_node
),
)
})?;
let digest = directory_node.digest().clone();
if let Some(directory) = self.tokio_handle.block_on(async {
self.directory_service.as_ref().get(&digest).await
@ -560,9 +541,11 @@ impl EvalIO for TvixStoreIO {
let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
for node in directory.nodes() {
children.push(match node {
Node::Directory(e) => (e.name, FileType::Directory),
Node::File(e) => (e.name, FileType::Regular),
Node::Symlink(e) => (e.name, FileType::Symlink),
Node::Directory(e) => {
(e.get_name().clone(), FileType::Directory)
}
Node::File(e) => (e.get_name().clone(), FileType::Regular),
Node::Symlink(e) => (e.get_name().clone(), FileType::Symlink),
})
}
Ok(children)

View file

@ -46,25 +46,20 @@ pub async fn get(
}
// parse the proto
let mut root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_proto))
let root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_proto))
.map_err(|e| {
warn!(err=%e, "unable to decode root node proto");
StatusCode::NOT_FOUND
})?;
let root_node: tvix_castore::directoryservice::Node = (&root_node).try_into().map_err(|e| {
warn!(err=%e, "root node validation failed");
StatusCode::BAD_REQUEST
})?;
// validate the node, but add a dummy node name, as we only send unnamed
// nodes
if let Some(rn) = root_node.node {
root_node.node = Some(rn.rename("00000000000000000000000000000000-dummy".into()))
}
let root_node = root_node
.validate()
.map_err(|e| {
warn!(err=%e, "root node validation failed");
StatusCode::BAD_REQUEST
})?
.to_owned();
let root_node = root_node.rename("00000000000000000000000000000000-dummy".into());
let (w, r) = tokio::io::duplex(1024 * 8);
@ -130,7 +125,7 @@ pub async fn put(
// store mapping of narhash to root node into root_nodes.
// we need it later to populate the root node when accepting the PathInfo.
root_nodes.write().put(nar_hash_actual, root_node);
root_nodes.write().put(nar_hash_actual, (&root_node).into());
Ok("")
}

View file

@ -61,22 +61,20 @@ pub async fn get(
StatusCode::INTERNAL_SERVER_ERROR
})?;
let mut narinfo = path_info.to_narinfo(store_path).ok_or_else(|| {
let mut narinfo = path_info.to_narinfo(store_path.as_ref()).ok_or_else(|| {
warn!(path_info=?path_info, "PathInfo contained no NAR data");
StatusCode::INTERNAL_SERVER_ERROR
})?;
// encode the (unnamed) root node in the NAR url itself.
let root_node = path_info
.node
.as_ref()
.and_then(|n| n.node.as_ref())
.expect("root node must not be none")
.clone()
.rename("".into());
let root_node = tvix_castore::directoryservice::Node::try_from(
path_info.node.as_ref().expect("root node must not be none"),
)
.unwrap() // PathInfo is validated
.rename("".into());
let mut buf = Vec::new();
Node::encode(&root_node, &mut buf);
Node::encode(&(&root_node).into(), &mut buf);
let url = format!(
"nar/tvix-castore/{}?narsize={}",
@ -128,10 +126,10 @@ pub async fn put(
// Lookup root node with peek, as we don't want to update the LRU list.
// We need to be careful to not hold the RwLock across the await point.
let maybe_root_node = root_nodes
let maybe_root_node: Option<tvix_castore::directoryservice::Node> = root_nodes
.read()
.peek(&narinfo.nar_hash)
.map(|v| v.to_owned());
.and_then(|v| v.try_into().ok());
match maybe_root_node {
Some(root_node) => {
@ -139,7 +137,7 @@ pub async fn put(
// We need to rename the node to the narinfo storepath basename, as
// that's where it's stored in PathInfo.
pathinfo.node = Some(castorepb::Node {
node: Some(root_node.rename(narinfo.store_path.to_string().into())),
node: Some((&root_node.rename(narinfo.store_path.to_string().into())).into()),
});
// Persist the PathInfo.

View file

@ -352,7 +352,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
// annotated with information we have from the reference graph.
let path_info = PathInfo {
node: Some(tvix_castore::proto::Node {
node: Some(root_node),
node: Some((&root_node).into()),
}),
references: Vec::from_iter(
elem.references.iter().map(|e| e.digest().to_vec().into()),

View file

@ -1,8 +1,9 @@
use std::path::Path;
use tracing::{debug, instrument};
use tvix_castore::{
blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path,
proto::node::Node, B3Digest,
blobservice::BlobService,
directoryservice::{DirectoryService, NamedNode, Node},
import::fs::ingest_path,
};
use nix_compat::{
@ -32,24 +33,24 @@ pub fn log_node(node: &Node, path: &Path) {
Node::Directory(directory_node) => {
debug!(
path = ?path,
name = ?directory_node.name,
digest = %B3Digest::try_from(directory_node.digest.clone()).unwrap(),
name = ?directory_node.get_name(),
digest = %directory_node.digest(),
"import successful",
)
}
Node::File(file_node) => {
debug!(
path = ?path,
name = ?file_node.name,
digest = %B3Digest::try_from(file_node.digest.clone()).unwrap(),
name = ?file_node.get_name(),
digest = %file_node.digest(),
"import successful"
)
}
Node::Symlink(symlink_node) => {
debug!(
path = ?path,
name = ?symlink_node.name,
target = ?symlink_node.target,
name = ?symlink_node.get_name(),
target = ?symlink_node.target(),
"import successful"
)
}
@ -87,7 +88,7 @@ pub fn derive_nar_ca_path_info(
// assemble the [crate::proto::PathInfo] object.
PathInfo {
node: Some(tvix_castore::proto::Node {
node: Some(root_node),
node: Some((&root_node).into()),
}),
// There's no reference scanning on path contents ingested like this.
references: vec![],

View file

@ -7,12 +7,11 @@ use tokio::{
};
use tvix_castore::{
blobservice::BlobService,
directoryservice::DirectoryService,
directoryservice::{DirectoryService, NamedNode, Node},
import::{
blobs::{self, ConcurrentBlobUploader},
ingest_entries, IngestionEntry, IngestionError,
},
proto::{node::Node, NamedNode},
PathBuf,
};
@ -99,7 +98,7 @@ where
let (_, node) = try_join!(produce, consume)?;
// remove the fake "root" name again
debug_assert_eq!(&node.get_name(), b"root");
debug_assert_eq!(&node.get_name()[..], b"root");
Ok(node.rename("".into()))
}
@ -172,12 +171,13 @@ mod test {
use rstest::*;
use tokio_stream::StreamExt;
use tvix_castore::blobservice::BlobService;
use tvix_castore::directoryservice::DirectoryService;
use tvix_castore::directoryservice::{
Directory, DirectoryNode, DirectoryService, FileNode, Node, SymlinkNode,
};
use tvix_castore::fixtures::{
DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
HELLOWORLD_BLOB_DIGEST,
};
use tvix_castore::proto as castorepb;
use crate::tests::fixtures::{
blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD,
@ -199,10 +199,13 @@ mod test {
.expect("must parse");
assert_eq!(
castorepb::node::Node::Symlink(castorepb::SymlinkNode {
name: "".into(), // name must be empty
target: "/nix/store/somewhereelse".into(),
}),
Node::Symlink(
SymlinkNode::new(
"".into(), // name must be empty
"/nix/store/somewhereelse".into(),
)
.unwrap()
),
root_node
);
}
@ -222,12 +225,15 @@ mod test {
.expect("must parse");
assert_eq!(
castorepb::node::Node::File(castorepb::FileNode {
name: "".into(), // name must be empty
digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
executable: false,
}),
Node::File(
FileNode::new(
"".into(), // name must be empty
HELLOWORLD_BLOB_DIGEST.clone(),
HELLOWORLD_BLOB_CONTENTS.len() as u64,
false,
)
.unwrap()
),
root_node
);
@ -250,11 +256,14 @@ mod test {
.expect("must parse");
assert_eq!(
castorepb::node::Node::Directory(castorepb::DirectoryNode {
name: "".into(), // name must be empty
digest: DIRECTORY_COMPLICATED.digest().into(),
size: DIRECTORY_COMPLICATED.size(),
}),
Node::Directory(
DirectoryNode::new(
"".into(), // name must be empty
DIRECTORY_COMPLICATED.digest(),
DIRECTORY_COMPLICATED.size(),
)
.unwrap()
),
root_node,
);
@ -262,7 +271,7 @@ mod test {
assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
// directoryservice must contain the directories, at least with get_recursive.
let resp: Result<Vec<castorepb::Directory>, _> = directory_service
let resp: Result<Vec<Directory>, _> = directory_service
.get_recursive(&DIRECTORY_COMPLICATED.digest())
.collect()
.await;

View file

@ -8,16 +8,14 @@ pub use import::ingest_nar_and_hash;
pub use renderer::calculate_size_and_sha256;
pub use renderer::write_nar;
pub use renderer::SimpleRenderer;
use tvix_castore::proto as castorepb;
use tvix_castore::directoryservice::Node;
#[async_trait]
pub trait NarCalculationService: Send + Sync {
/// Return the nar size and nar sha256 digest for a given root node.
/// This can be used to calculate NAR-based output paths.
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), tvix_castore::Error>;
async fn calculate_nar(&self, root_node: &Node)
-> Result<(u64, [u8; 32]), tvix_castore::Error>;
}
#[async_trait]
@ -27,7 +25,7 @@ where
{
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
root_node: &Node,
) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
self.as_ref().calculate_nar(root_node).await
}

View file

@ -10,8 +10,7 @@ use tracing::{instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tvix_castore::{
blobservice::BlobService,
directoryservice::DirectoryService,
proto::{self as castorepb, NamedNode},
directoryservice::{DirectoryService, NamedNode, Node},
};
pub struct SimpleRenderer<BS, DS> {
@ -36,7 +35,7 @@ where
{
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
root_node: &Node,
) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
calculate_size_and_sha256(
root_node,
@ -52,7 +51,7 @@ where
/// NAR output.
#[instrument(skip_all, fields(indicatif.pb_show=1))]
pub async fn calculate_size_and_sha256<BS, DS>(
root_node: &castorepb::node::Node,
root_node: &Node,
blob_service: BS,
directory_service: DS,
) -> Result<(u64, [u8; 32]), RenderError>
@ -80,13 +79,13 @@ where
Ok((cw.count(), h.finalize().into()))
}
/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
/// Accepts a [Node] pointing to the root of a (store) path,
/// and uses the passed blob_service and directory_service to perform the
/// necessary lookups as it traverses the structure.
/// The contents in NAR serialization are writen to the passed [AsyncWrite].
pub async fn write_nar<W, BS, DS>(
mut w: W,
proto_root_node: &castorepb::node::Node,
proto_root_node: &Node,
blob_service: BS,
directory_service: DS,
) -> Result<(), RenderError>
@ -115,7 +114,7 @@ where
/// This consumes the node.
async fn walk_node<BS, DS>(
nar_node: nar_writer::Node<'_, '_>,
proto_node: &castorepb::node::Node,
proto_node: &Node,
blob_service: BS,
directory_service: DS,
) -> Result<(BS, DS), RenderError>
@ -124,23 +123,17 @@ where
DS: DirectoryService + Send,
{
match proto_node {
castorepb::node::Node::Symlink(proto_symlink_node) => {
Node::Symlink(proto_symlink_node) => {
nar_node
.symlink(&proto_symlink_node.target)
.symlink(proto_symlink_node.target())
.await
.map_err(RenderError::NARWriterError)?;
}
castorepb::node::Node::File(proto_file_node) => {
let digest_len = proto_file_node.digest.len();
let digest = proto_file_node.digest.clone().try_into().map_err(|_| {
RenderError::StoreError(io::Error::new(
io::ErrorKind::Other,
format!("invalid digest len {} in file node", digest_len),
))
})?;
Node::File(proto_file_node) => {
let digest = proto_file_node.digest();
let mut blob_reader = match blob_service
.open_read(&digest)
.open_read(digest)
.await
.map_err(RenderError::StoreError)?
{
@ -153,36 +146,24 @@ where
nar_node
.file(
proto_file_node.executable,
proto_file_node.size,
proto_file_node.executable(),
proto_file_node.size(),
&mut blob_reader,
)
.await
.map_err(RenderError::NARWriterError)?;
}
castorepb::node::Node::Directory(proto_directory_node) => {
let digest_len = proto_directory_node.digest.len();
let digest = proto_directory_node
.digest
.clone()
.try_into()
.map_err(|_| {
RenderError::StoreError(io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid digest len {} in directory node", digest_len),
))
})?;
Node::Directory(proto_directory_node) => {
// look it up with the directory service
match directory_service
.get(&digest)
.get(proto_directory_node.digest())
.await
.map_err(|e| RenderError::StoreError(e.into()))?
{
// if it's None, that's an error!
None => Err(RenderError::DirectoryNotFound(
digest,
proto_directory_node.name.clone(),
proto_directory_node.digest().clone(),
proto_directory_node.get_name().clone(),
))?,
Some(proto_directory) => {
// start a directory node
@ -206,7 +187,7 @@ where
(blob_service, directory_service) = Box::pin(walk_node(
child_node,
&proto_node,
proto_node,
blob_service,
directory_service,
))

View file

@ -1,10 +1,10 @@
use futures::stream::BoxStream;
use futures::StreamExt;
use tonic::async_trait;
use tvix_castore::directoryservice::Node;
use tvix_castore::fs::{RootNodes, TvixStoreFs};
use tvix_castore::proto as castorepb;
use tvix_castore::Error;
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
use tvix_castore::{Error, ValidateNodeError};
use super::PathInfoService;
@ -48,7 +48,7 @@ impl<T> RootNodes for RootNodesWrapper<T>
where
T: AsRef<dyn PathInfoService> + Send + Sync,
{
async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> {
async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> {
let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else {
return Ok(None);
};
@ -61,20 +61,23 @@ where
.map(|path_info| {
path_info
.node
.as_ref()
.expect("missing root node")
.node
.expect("empty node")
}))
.try_into()
.map_err(|e: ValidateNodeError| Error::StorageError(e.to_string()))
})
.transpose()?)
}
fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> {
fn list(&self) -> BoxStream<Result<Node, Error>> {
Box::pin(self.0.as_ref().list().map(|result| {
result.map(|path_info| {
result.and_then(|path_info| {
path_info
.node
.as_ref()
.expect("missing root node")
.node
.expect("empty node")
.try_into()
.map_err(|e: ValidateNodeError| Error::StorageError(e.to_string()))
})
}))
}

View file

@ -11,7 +11,8 @@ use tonic::{async_trait, Code};
use tracing::{instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::{proto as castorepb, Error};
use tvix_castore::directoryservice::Node;
use tvix_castore::Error;
/// Connects to a (remote) tvix-store PathInfoService over gRPC.
#[derive(Clone)]
@ -123,10 +124,7 @@ where
T::Future: Send,
{
#[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))]
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
async fn calculate_nar(&self, root_node: &Node) -> Result<(u64, [u8; 32]), Error> {
let span = Span::current();
span.pb_set_message("Waiting for NAR calculation");
span.pb_start();
@ -134,8 +132,8 @@ where
let path_info = self
.grpc_client
.clone()
.calculate_nar(castorepb::Node {
node: Some(root_node.clone()),
.calculate_nar(tvix_castore::proto::Node {
node: Some(root_node.into()),
})
.await
.map_err(|e| Error::StorageError(e.to_string()))?

View file

@ -109,7 +109,10 @@ mod test {
let root_node = p.node.as_mut().unwrap();
if let castorepb::Node { node: Some(node) } = root_node {
let n = node.to_owned();
*node = n.rename("11111111111111111111111111111111-dummy2".into());
*node = (&tvix_castore::directoryservice::Node::try_from(&n)
.unwrap()
.rename("11111111111111111111111111111111-dummy2".into()))
.into();
} else {
unreachable!()
}

View file

@ -230,7 +230,7 @@ where
Ok(Some(PathInfo {
node: Some(castorepb::Node {
// set the name of the root node to the digest-name of the store path.
node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
node: Some((&root_node.rename(narinfo.store_path.to_string().into())).into()),
}),
references: pathinfo.references,
narinfo: pathinfo.narinfo,

View file

@ -74,24 +74,19 @@ where
&self,
request: Request<castorepb::Node>,
) -> Result<Response<proto::CalculateNarResponse>> {
match request.into_inner().node {
None => Err(Status::invalid_argument("no root node sent")),
Some(root_node) => {
if let Err(e) = root_node.validate() {
warn!(err = %e, "invalid root node");
Err(Status::invalid_argument("invalid root node"))?
}
let root_node = (&request.into_inner()).try_into().map_err(|e| {
warn!(err = %e, "invalid root node");
Status::invalid_argument("invalid root node")
})?;
match self.nar_calculation_service.calculate_nar(&root_node).await {
Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
nar_size,
nar_sha256: nar_sha256.to_vec().into(),
})),
Err(e) => {
warn!(err = %e, "error during NAR calculation");
Err(e.into())
}
}
match self.nar_calculation_service.calculate_nar(&root_node).await {
Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
nar_size,
nar_sha256: nar_sha256.to_vec().into(),
})),
Err(e) => {
warn!(err = %e, "error during NAR calculation");
Err(e.into())
}
}
}

View file

@ -9,7 +9,8 @@ use nix_compat::{
store_path::{self, StorePathRef},
};
use thiserror::Error;
use tvix_castore::proto::{self as castorepb, NamedNode, ValidateNodeError};
use tvix_castore::directoryservice::NamedNode;
use tvix_castore::ValidateNodeError;
mod grpc_pathinfoservice_wrapper;
@ -87,7 +88,7 @@ impl PathInfo {
/// validate performs some checks on the PathInfo struct,
/// Returning either a [store_path::StorePath] of the root node, or a
/// [ValidatePathInfoError].
pub fn validate(&self) -> Result<store_path::StorePathRef<'_>, ValidatePathInfoError> {
pub fn validate(&self) -> Result<store_path::StorePath, ValidatePathInfoError> {
// ensure the references have the right number of bytes.
for (i, reference) in self.references.iter().enumerate() {
if reference.len() != store_path::DIGEST_SIZE {
@ -158,14 +159,15 @@ impl PathInfo {
// Ensure there is a (root) node present, and it properly parses to a [store_path::StorePath].
let root_nix_path = match &self.node {
None | Some(castorepb::Node { node: None }) => {
Err(ValidatePathInfoError::NoNodePresent)?
}
Some(castorepb::Node { node: Some(node) }) => {
node.validate()
None => Err(ValidatePathInfoError::NoNodePresent)?,
Some(node) => {
// TODO save result somewhere
let node: tvix_castore::directoryservice::Node = node
.try_into()
.map_err(ValidatePathInfoError::InvalidRootNode)?;
// parse the name of the node itself and return
parse_node_name_root(node.get_name(), ValidatePathInfoError::InvalidNodeName)?
.to_owned()
}
};

View file

@ -3,17 +3,18 @@ use crate::tests::fixtures::*;
use bytes::Bytes;
use data_encoding::BASE64;
use nix_compat::nixbase32;
use nix_compat::store_path::{self, StorePathRef};
use nix_compat::store_path::{self, StorePath, StorePathRef};
use rstest::rstest;
use tvix_castore::proto as castorepb;
use tvix_castore::ValidateNodeError;
#[rstest]
#[case::no_node(None, Err(ValidatePathInfoError::NoNodePresent))]
#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::NoNodePresent))]
#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::InvalidRootNode(ValidateNodeError::NoNodeSet)))]
fn validate_pathinfo(
#[case] node: Option<castorepb::Node>,
#[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
#[case] exp_result: Result<StorePath, ValidatePathInfoError>,
) {
// construct the PathInfo object
let p = PathInfo {
@ -22,9 +23,6 @@ fn validate_pathinfo(
};
assert_eq!(exp_result, p.validate());
let err = p.validate().expect_err("validation should fail");
assert!(matches!(err, ValidatePathInfoError::NoNodePresent));
}
#[rstest]
@ -32,12 +30,12 @@ fn validate_pathinfo(
name: DUMMY_PATH.into(),
digest: DUMMY_DIGEST.clone().into(),
size: 0,
}, Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))]
}, Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))]
#[case::invalid_digest_length(castorepb::DirectoryNode {
name: DUMMY_PATH.into(),
digest: Bytes::new(),
size: 0,
}, Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))))]
}, Err(ValidatePathInfoError::InvalidRootNode(tvix_castore::ValidateNodeError::InvalidDigestLen(0))))]
#[case::invalid_node_name_no_storepath(castorepb::DirectoryNode {
name: "invalid".into(),
digest: DUMMY_DIGEST.clone().into(),
@ -48,7 +46,7 @@ fn validate_pathinfo(
)))]
fn validate_directory(
#[case] directory_node: castorepb::DirectoryNode,
#[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
#[case] exp_result: Result<StorePath, ValidatePathInfoError>,
) {
// construct the PathInfo object
let p = PathInfo {
@ -68,7 +66,7 @@ fn validate_directory(
size: 0,
executable: false,
},
Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
)]
#[case::invalid_digest_len(
castorepb::FileNode {
@ -76,7 +74,7 @@ fn validate_directory(
digest: Bytes::new(),
..Default::default()
},
Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0)))
Err(ValidatePathInfoError::InvalidRootNode(tvix_castore::ValidateNodeError::InvalidDigestLen(0)))
)]
#[case::invalid_node_name(
castorepb::FileNode {
@ -91,7 +89,7 @@ fn validate_directory(
)]
fn validate_file(
#[case] file_node: castorepb::FileNode,
#[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
#[case] exp_result: Result<StorePath, ValidatePathInfoError>,
) {
// construct the PathInfo object
let p = PathInfo {
@ -109,7 +107,7 @@ fn validate_file(
name: DUMMY_PATH.into(),
target: "foo".into(),
},
Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap())
)]
#[case::invalid_node_name(
castorepb::SymlinkNode {
@ -123,7 +121,7 @@ fn validate_file(
)]
fn validate_symlink(
#[case] symlink_node: castorepb::SymlinkNode,
#[case] exp_result: Result<StorePathRef, ValidatePathInfoError>,
#[case] exp_result: Result<StorePath, ValidatePathInfoError>,
) {
// construct the PathInfo object
let p = PathInfo {
@ -233,7 +231,7 @@ fn validate_symlink_empty_target_invalid() {
target: "".into(),
});
node.validate().expect_err("must fail validation");
tvix_castore::directoryservice::Node::try_from(&node).expect_err("must fail validation");
}
/// Create a node with a symlink target including null bytes, and ensure it
@ -245,7 +243,7 @@ fn validate_symlink_target_null_byte_invalid() {
target: "foo\0".into(),
});
node.validate().expect_err("must fail validation");
tvix_castore::directoryservice::Node::try_from(&node).expect_err("must fail validation");
}
/// Create a PathInfo with a correct deriver field and ensure it succeeds.

View file

@ -9,8 +9,9 @@ use std::io;
use std::sync::Arc;
use tokio::io::sink;
use tvix_castore::blobservice::BlobService;
use tvix_castore::directoryservice::DirectoryService;
use tvix_castore::proto as castorepb;
use tvix_castore::directoryservice::{
DirectoryNode, DirectoryService, FileNode, Node, SymlinkNode,
};
#[rstest]
#[tokio::test]
@ -22,10 +23,9 @@ async fn single_symlink(
write_nar(
&mut buf,
&castorepb::node::Node::Symlink(castorepb::SymlinkNode {
name: "doesntmatter".into(),
target: "/nix/store/somewhereelse".into(),
}),
&Node::Symlink(
SymlinkNode::new("doesntmatter".into(), "/nix/store/somewhereelse".into()).unwrap(),
),
// don't put anything in the stores, as we don't actually do any requests.
blob_service,
directory_service,
@ -45,12 +45,15 @@ async fn single_file_missing_blob(
) {
let e = write_nar(
sink(),
&castorepb::node::Node::File(castorepb::FileNode {
name: "doesntmatter".into(),
digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
executable: false,
}),
&Node::File(
FileNode::new(
"doesntmatter".into(),
HELLOWORLD_BLOB_DIGEST.clone(),
HELLOWORLD_BLOB_CONTENTS.len() as u64,
false,
)
.unwrap(),
),
// the blobservice is empty intentionally, to provoke the error.
blob_service,
directory_service,
@ -90,12 +93,15 @@ async fn single_file_wrong_blob_size(
// Test with a root FileNode of a too big size
let e = write_nar(
sink(),
&castorepb::node::Node::File(castorepb::FileNode {
name: "doesntmatter".into(),
digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
size: 42, // <- note the wrong size here!
executable: false,
}),
&Node::File(
FileNode::new(
"doesntmatter".into(),
HELLOWORLD_BLOB_DIGEST.clone(),
42, // <- note the wrong size here!
false,
)
.unwrap(),
),
blob_service.clone(),
directory_service.clone(),
)
@ -112,12 +118,15 @@ async fn single_file_wrong_blob_size(
// Test with a root FileNode of a too small size
let e = write_nar(
sink(),
&castorepb::node::Node::File(castorepb::FileNode {
name: "doesntmatter".into(),
digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
size: 2, // <- note the wrong size here!
executable: false,
}),
&Node::File(
FileNode::new(
"doesntmatter".into(),
HELLOWORLD_BLOB_DIGEST.clone(),
2, // <- note the wrong size here!
false,
)
.unwrap(),
),
blob_service,
directory_service,
)
@ -153,12 +162,15 @@ async fn single_file(
write_nar(
&mut buf,
&castorepb::node::Node::File(castorepb::FileNode {
name: "doesntmatter".into(),
digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
executable: false,
}),
&Node::File(
FileNode::new(
"doesntmatter".into(),
HELLOWORLD_BLOB_DIGEST.clone(),
HELLOWORLD_BLOB_CONTENTS.len() as u64,
false,
)
.unwrap(),
),
blob_service,
directory_service,
)
@ -196,11 +208,14 @@ async fn test_complicated(
write_nar(
&mut buf,
&castorepb::node::Node::Directory(castorepb::DirectoryNode {
name: "doesntmatter".into(),
digest: DIRECTORY_COMPLICATED.digest().into(),
size: DIRECTORY_COMPLICATED.size(),
}),
&Node::Directory(
DirectoryNode::new(
"doesntmatter".into(),
DIRECTORY_COMPLICATED.digest(),
DIRECTORY_COMPLICATED.size(),
)
.unwrap(),
),
blob_service.clone(),
directory_service.clone(),
)
@ -211,11 +226,14 @@ async fn test_complicated(
// ensure calculate_nar does return the correct sha256 digest and sum.
let (nar_size, nar_digest) = calculate_size_and_sha256(
&castorepb::node::Node::Directory(castorepb::DirectoryNode {
name: "doesntmatter".into(),
digest: DIRECTORY_COMPLICATED.digest().into(),
size: DIRECTORY_COMPLICATED.size(),
}),
&Node::Directory(
DirectoryNode::new(
"doesntmatter".into(),
DIRECTORY_COMPLICATED.digest(),
DIRECTORY_COMPLICATED.size(),
)
.unwrap(),
),
blob_service,
directory_service,
)