refactor(tvix/{cli,store}): move TvixStoreIO to tvix-cli crate

This trait is eval-specific, there's no point in dealing with these
things in tvix-store.

This implements the EvalIO interface for a Tvix store.
The proper place for this glue code (for now) is tvix-cli, which knows
about both tvix-store and tvix-eval.

There's one annoyance with this move: The `tvix-store import` subcommand
previously also used the TvixStoreIO implementation (because it
conveniently did what we wanted).
Some of this code had to be duplicated, mostly logic to calculate the
NAR-based output path and create the PathInfo object.

Some, but potentially more of this can be extracted into helper
functions in a shared crate, and then be used from both TvixStoreIO in
tvix-cli as well as the tvix-store CLI entrypoint.

Change-Id: Ia7515e83c1b54f95baf810fbd8414c5521382d40
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9212
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2023-09-02 21:16:35 +03:00 committed by clbot
parent 428b655845
commit 3c340b28bd
10 changed files with 113 additions and 87 deletions

2
tvix/Cargo.lock generated
View file

@ -2724,6 +2724,7 @@ dependencies = [
"smol_str",
"ssri",
"thiserror",
"tracing",
"tvix-eval",
"tvix-store",
"wu-manber",
@ -2814,7 +2815,6 @@ dependencies = [
"tower",
"tracing",
"tracing-subscriber",
"tvix-eval",
"url",
"walkdir",
]

View file

@ -8050,6 +8050,10 @@ rec {
name = "thiserror";
packageId = "thiserror";
}
{
name = "tracing";
packageId = "tracing";
}
{
name = "tvix-eval";
packageId = "tvix-eval";
@ -8392,10 +8396,6 @@ rec {
packageId = "tracing-subscriber";
features = [ "json" ];
}
{
name = "tvix-eval";
packageId = "tvix-eval";
}
{
name = "url";
packageId = "url";

View file

@ -11,14 +11,15 @@ path = "src/main.rs"
nix-compat = { path = "../nix-compat" }
tvix-store = { path = "../store", features = []}
tvix-eval = { path = "../eval" }
rustyline = "10.0.0"
bytes = "1.4.0"
clap = { version = "4.0", features = ["derive", "env"] }
data-encoding = "2.3.3"
dirs = "4.0.0"
rustyline = "10.0.0"
smol_str = "0.2.0"
ssri = "7.0.0"
data-encoding = "2.3.3"
thiserror = "1.0.38"
bytes = "1.4.0"
tracing = "0.1.37"
[dependencies.wu-manber]
git = "https://github.com/tvlfyi/wu-manber.git"

View file

@ -3,6 +3,7 @@ mod errors;
mod known_paths;
mod refscan;
mod tvix_io;
mod tvix_store_io;
use std::cell::RefCell;
use std::rc::Rc;
@ -17,6 +18,7 @@ use tvix_eval::Value;
use tvix_store::blobservice::MemoryBlobService;
use tvix_store::directoryservice::MemoryDirectoryService;
use tvix_store::pathinfoservice::MemoryPathInfoService;
use tvix_store_io::TvixStoreIO;
#[derive(Parser)]
struct Args {
@ -80,7 +82,7 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b
eval.io_handle = Box::new(tvix_io::TvixIO::new(
known_paths.clone(),
tvix_store::TvixStoreIO::new(blob_service, directory_service, path_info_service),
TvixStoreIO::new(blob_service, directory_service, path_info_service),
));
// bundle fetchurl.nix (used in nixpkgs) by resolving <nix> to

View file

@ -15,6 +15,7 @@ use std::path::{Path, PathBuf};
use std::rc::Rc;
use tvix_eval::{EvalIO, FileType};
// TODO: Merge this together with TvixStoreIO?
pub(crate) struct TvixIO<T: EvalIO> {
/// Ingested paths must be reported to this known paths tracker
/// for accurate build reference scanning.

View file

@ -1,23 +1,17 @@
//! This module provides an implementation of EvalIO.
//!
//! It can be used by the tvix evalutator to talk to a tvix store.
//! This module provides an implementation of EvalIO talking to tvix-store.
use data_encoding::BASE64;
use nix_compat::{
nixhash::{HashAlgo, NixHash, NixHashWithMode},
store_path::{build_regular_ca_path, StorePath},
};
use nix_compat::store_path::{self, StorePath};
use std::{io, path::Path, path::PathBuf, sync::Arc};
use tracing::{error, instrument, warn};
use tvix_eval::{EvalIO, FileType, StdIO};
use crate::{
use tvix_store::{
blobservice::BlobService,
directoryservice::{self, DirectoryService},
import,
nar::calculate_size_and_sha256,
pathinfoservice::PathInfoService,
proto::NamedNode,
proto::{node::Node, NamedNode, NarInfo, PathInfo},
B3Digest,
};
@ -50,14 +44,15 @@ impl TvixStoreIO {
}
/// for a given [StorePath] and additional [Path] inside the store path,
/// look up the [PathInfo], and if it exists, traverse the directory structure to
/// return the [crate::proto::node::Node] specified by `sub_path`.
/// look up the [PathInfo], and if it exists, and then use
/// [directoryservice::traverse_to] to return the
/// [Node] specified by `sub_path`.
#[instrument(skip(self), ret, err)]
fn store_path_to_root_node(
&self,
store_path: &StorePath,
sub_path: &Path,
) -> Result<Option<crate::proto::node::Node>, crate::Error> {
) -> Result<Option<Node>, io::Error> {
let path_info = {
match self.path_info_service.get(store_path.digest)? {
// If there's no PathInfo found, early exit
@ -85,19 +80,20 @@ impl TvixStoreIO {
}
};
directoryservice::traverse_to(self.directory_service.clone(), root_node, sub_path)
Ok(directoryservice::traverse_to(
self.directory_service.clone(),
root_node,
sub_path,
)?)
}
/// Imports a given path on the filesystem into the store, and returns the
/// [crate::proto::PathInfo] describing the path, that was sent to
/// [PathInfo] describing the path, that was sent to
/// [PathInfoService].
/// While not part of the [EvalIO], it's still useful for clients who
/// care about the [PathInfo].
#[instrument(skip(self), ret, err)]
pub fn import_path_with_pathinfo(
&self,
path: &std::path::Path,
) -> Result<crate::proto::PathInfo, io::Error> {
pub fn import_path_with_pathinfo(&self, path: &std::path::Path) -> Result<PathInfo, io::Error> {
// Call [import::ingest_path], which will walk over the given path and return a root_node.
let root_node = import::ingest_path(
self.blob_service.clone(),
@ -114,33 +110,26 @@ impl TvixStoreIO {
)
.expect("error during nar calculation"); // TODO: handle error
// We populate the struct directly, as we know the sha256 digest has the
// right size.
let nar_hash_with_mode = NixHashWithMode::Recursive(NixHash {
algo: HashAlgo::Sha256,
digest: nar_sha256.to_vec(),
});
// TODO: make a path_to_name helper function?
let name = path
.file_name()
.expect("path must not be ..")
.to_str()
.expect("path must be valid unicode");
let output_path =
build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap();
let output_path = store_path::build_nar_based_store_path(&nar_sha256, name);
// assemble a new root_node with a name that is derived from the nar hash.
let root_node = root_node.rename(output_path.to_string().into_bytes().into());
// assemble the [crate::proto::PathInfo] object.
let path_info = crate::proto::PathInfo {
node: Some(crate::proto::Node {
// assemble the [PathInfo] object.
let path_info = PathInfo {
node: Some(tvix_store::proto::Node {
node: Some(root_node),
}),
// There's no reference scanning on path contents ingested like this.
references: vec![],
narinfo: Some(crate::proto::NarInfo {
narinfo: Some(NarInfo {
nar_size,
nar_sha256: nar_sha256.to_vec().into(),
signatures: vec![],
@ -150,27 +139,14 @@ impl TvixStoreIO {
}),
};
// put into [PathInfoService], and return the PathInfo that we get back
// from there (it might contain additional signatures).
// put into [PathInfoService], and return the [PathInfo] that we get
// back from there (it might contain additional signatures).
let path_info = self.path_info_service.put(path_info)?;
Ok(path_info)
}
}
/// For given NAR sha256 digest and name, return the new [StorePath] this would have.
#[instrument(skip(nar_sha256_digest), ret, fields(nar_sha256_digest=BASE64.encode(nar_sha256_digest)))]
fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> StorePath {
// We populate the struct directly, as we know the sha256 digest has the
// right size.
let nar_hash_with_mode = NixHashWithMode::Recursive(NixHash {
algo: HashAlgo::Sha256,
digest: nar_sha256_digest.to_vec(),
});
build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap()
}
impl EvalIO for TvixStoreIO {
#[instrument(skip(self), ret, err)]
fn path_exists(&self, path: &Path) -> Result<bool, io::Error> {
@ -201,14 +177,14 @@ impl EvalIO for TvixStoreIO {
if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? {
// depending on the node type, treat read_to_string differently
match node {
crate::proto::node::Node::Directory(_) => {
Node::Directory(_) => {
// This would normally be a io::ErrorKind::IsADirectory (still unstable)
Err(io::Error::new(
io::ErrorKind::Unsupported,
"tried to read directory at {path} to string",
format!("tried to read directory at {:?} to string", path),
))
}
crate::proto::node::Node::File(file_node) => {
Node::File(file_node) => {
let digest: B3Digest =
file_node.digest.clone().try_into().map_err(|_e| {
error!(
@ -240,7 +216,7 @@ impl EvalIO for TvixStoreIO {
io::read_to_string(reader)
}
crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new(
Node::Symlink(_symlink_node) => Err(io::Error::new(
io::ErrorKind::Unsupported,
"read_to_string for symlinks is unsupported",
))?,
@ -263,7 +239,7 @@ impl EvalIO for TvixStoreIO {
{
if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? {
match node {
crate::proto::node::Node::Directory(directory_node) => {
Node::Directory(directory_node) => {
// fetch the Directory itself.
let digest = directory_node.digest.clone().try_into().map_err(|_e| {
io::Error::new(
@ -279,15 +255,9 @@ impl EvalIO for TvixStoreIO {
let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
for node in directory.nodes() {
children.push(match node {
crate::proto::node::Node::Directory(e) => {
(e.name, FileType::Directory)
}
crate::proto::node::Node::File(e) => {
(e.name, FileType::Regular)
}
crate::proto::node::Node::Symlink(e) => {
(e.name, FileType::Symlink)
}
Node::Directory(e) => (e.name, FileType::Directory),
Node::File(e) => (e.name, FileType::Regular),
Node::Symlink(e) => (e.name, FileType::Symlink),
})
}
Ok(children)
@ -304,14 +274,14 @@ impl EvalIO for TvixStoreIO {
))?
}
}
crate::proto::node::Node::File(_file_node) => {
Node::File(_file_node) => {
// This would normally be a io::ErrorKind::NotADirectory (still unstable)
Err(io::Error::new(
io::ErrorKind::Unsupported,
"tried to readdir path {:?}, which is a file",
))?
}
crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new(
Node::Symlink(_symlink_node) => Err(io::Error::new(
io::ErrorKind::Unsupported,
"read_dir for symlinks is unsupported",
))?,

View file

@ -121,6 +121,18 @@ pub fn build_regular_ca_path<S: AsRef<str>, I: IntoIterator<Item = S>>(
}
}
/// For given NAR sha256 digest and name, return the new [StorePath] this would have.
pub fn build_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> StorePath {
// We populate the struct directly, as we know the sha256 digest has the
// right size.
let nar_hash_with_mode = NixHashWithMode::Recursive(NixHash {
algo: HashAlgo::Sha256,
digest: nar_sha256_digest.to_vec(),
});
build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap()
}
/// This builds an input-addressed store path
///
/// Input-addresed store paths are always derivation outputs, the "input" in question is the

View file

@ -15,7 +15,6 @@ prost = "0.11.2"
rayon = "1.6.1"
sha2 = "0.10.6"
sled = { version = "0.34.7", features = ["compression"] }
tvix-eval = { path = "../eval" }
thiserror = "1.0.38"
tokio-stream = "0.1.14"
tokio = { version = "1.28.0", features = ["rt-multi-thread", "net"] }

View file

@ -1,13 +1,14 @@
use clap::Subcommand;
use data_encoding::BASE64;
use futures::future::try_join_all;
use nix_compat::store_path;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing_subscriber::prelude::*;
use tvix_store::blobservice;
use tvix_store::directoryservice;
use tvix_store::import;
use tvix_store::pathinfoservice;
use tvix_store::proto::blob_service_server::BlobServiceServer;
use tvix_store::proto::directory_service_server::DirectoryServiceServer;
@ -16,7 +17,8 @@ use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
use tvix_store::proto::GRPCBlobServiceWrapper;
use tvix_store::proto::GRPCDirectoryServiceWrapper;
use tvix_store::proto::GRPCPathInfoServiceWrapper;
use tvix_store::TvixStoreIO;
use tvix_store::proto::NarInfo;
use tvix_store::proto::PathInfo;
use tvix_store::FUSE;
#[cfg(feature = "reflection")]
@ -173,6 +175,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
directory_service_addr,
path_info_service_addr,
} => {
// FUTUREWORK: allow flat for single files?
let blob_service = blobservice::from_addr(&blob_service_addr)?;
let directory_service = directoryservice::from_addr(&directory_service_addr)?;
let path_info_service = pathinfoservice::from_addr(
@ -181,20 +184,60 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
directory_service.clone(),
)?;
let io = Arc::new(TvixStoreIO::new(
blob_service,
directory_service,
path_info_service,
));
let tasks = paths
.iter()
.into_iter()
.map(|path| {
let io_move = io.clone();
let path = path.clone();
let blob_service = blob_service.clone();
let directory_service = directory_service.clone();
let path_info_service = path_info_service.clone();
let task: tokio::task::JoinHandle<Result<(), io::Error>> =
tokio::task::spawn_blocking(move || {
let path_info = io_move.import_path_with_pathinfo(&path)?;
// Ingest the path into blob and directory service.
let root_node = import::ingest_path(
blob_service.clone(),
directory_service.clone(),
&path,
)
.expect("failed to ingest path");
// Ask the PathInfoService for the NAR size and sha256
let (nar_size, nar_sha256) =
path_info_service.calculate_nar(&root_node)?;
// TODO: make a path_to_name helper function?
let name = path
.file_name()
.expect("path must not be ..")
.to_str()
.expect("path must be valid unicode");
let output_path =
store_path::build_nar_based_store_path(&nar_sha256, name);
// assemble a new root_node with a name that is derived from the nar hash.
let root_node =
root_node.rename(output_path.to_string().into_bytes().into());
// assemble the [crate::proto::PathInfo] object.
let path_info = PathInfo {
node: Some(tvix_store::proto::Node {
node: Some(root_node),
}),
// There's no reference scanning on path contents ingested like this.
references: vec![],
narinfo: Some(NarInfo {
nar_size,
nar_sha256: nar_sha256.to_vec().into(),
signatures: vec![],
reference_names: vec![],
}),
};
// put into [PathInfoService], and return the PathInfo that we get back
// from there (it might contain additional signatures).
let path_info = path_info_service.put(path_info)?;
print_node(&path_info.node.unwrap().node.unwrap(), &path);
Ok(())
});

View file

@ -2,7 +2,6 @@ mod digests;
mod errors;
#[cfg(feature = "fuse")]
mod fuse;
mod store_io;
pub mod blobservice;
pub mod directoryservice;
@ -13,7 +12,6 @@ pub mod proto;
pub use digests::B3Digest;
pub use errors::Error;
pub use store_io::TvixStoreIO;
#[cfg(feature = "fuse")]
pub use fuse::FUSE;