feat(tvix/store): implement TvixStoreIO
This providesEvalIO, asking given PathInfoService, DirectoryService and BlobService. Change-Id: I32f210f5a7aa8173ad9a7d53e8a5ac03619f527a Reviewed-on: https://cl.tvl.fyi/c/depot/+/8561 Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
parent
4b00f1d7ac
commit
5774117d5e
6 changed files with 397 additions and 38 deletions
3
tvix/Cargo.lock
generated
3
tvix/Cargo.lock
generated
|
@ -2708,8 +2708,10 @@ dependencies = [
|
|||
"prost",
|
||||
"prost-build",
|
||||
"rayon",
|
||||
"serde_json",
|
||||
"sha2 0.10.6",
|
||||
"sled",
|
||||
"smol_str",
|
||||
"tempfile",
|
||||
"test-case",
|
||||
"thiserror",
|
||||
|
@ -2723,6 +2725,7 @@ dependencies = [
|
|||
"tower",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tvix-eval",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
|
|
|
@ -8032,6 +8032,10 @@ rec {
|
|||
name = "rayon";
|
||||
packageId = "rayon";
|
||||
}
|
||||
{
|
||||
name = "serde_json";
|
||||
packageId = "serde_json";
|
||||
}
|
||||
{
|
||||
name = "sha2";
|
||||
packageId = "sha2 0.10.6";
|
||||
|
@ -8041,6 +8045,10 @@ rec {
|
|||
packageId = "sled";
|
||||
features = [ "compression" ];
|
||||
}
|
||||
{
|
||||
name = "smol_str";
|
||||
packageId = "smol_str";
|
||||
}
|
||||
{
|
||||
name = "thiserror";
|
||||
packageId = "thiserror";
|
||||
|
@ -8081,6 +8089,10 @@ rec {
|
|||
packageId = "tracing-subscriber";
|
||||
features = [ "json" ];
|
||||
}
|
||||
{
|
||||
name = "tvix-eval";
|
||||
packageId = "tvix-eval";
|
||||
}
|
||||
{
|
||||
name = "walkdir";
|
||||
packageId = "walkdir";
|
||||
|
|
|
@ -15,6 +15,7 @@ prost = "0.11.2"
|
|||
rayon = "1.6.1"
|
||||
sha2 = "0.10.6"
|
||||
sled = { version = "0.34.7", features = ["compression"] }
|
||||
tvix-eval = { path = "../eval" }
|
||||
thiserror = "1.0.38"
|
||||
tokio-stream = "0.1.14"
|
||||
tokio = { version = "1.28.0", features = ["rt-multi-thread", "net"] }
|
||||
|
@ -26,6 +27,8 @@ tokio-util = { version = "0.7.8", features = ["io", "io-util"] }
|
|||
tower = "0.4.13"
|
||||
futures = "0.3.28"
|
||||
bytes = "1.4.0"
|
||||
smol_str = "0.2.0"
|
||||
serde_json = "1.0"
|
||||
|
||||
[dependencies.tonic-reflection]
|
||||
optional = true
|
||||
|
|
|
@ -1,16 +1,9 @@
|
|||
use clap::Subcommand;
|
||||
use data_encoding::BASE64;
|
||||
use nix_compat::derivation::Derivation;
|
||||
use nix_compat::derivation::Output;
|
||||
use nix_compat::nixhash::HashAlgo;
|
||||
use nix_compat::nixhash::NixHash;
|
||||
use nix_compat::nixhash::NixHashWithMode;
|
||||
use std::path::PathBuf;
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tvix_store::blobservice::SledBlobService;
|
||||
use tvix_store::directoryservice::SledDirectoryService;
|
||||
use tvix_store::import::ingest_path;
|
||||
use tvix_store::nar::NARCalculationService;
|
||||
use tvix_store::nar::NonCachingNARCalculationService;
|
||||
use tvix_store::pathinfoservice::SledPathInfoService;
|
||||
use tvix_store::proto::blob_service_server::BlobServiceServer;
|
||||
|
@ -19,6 +12,7 @@ use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
|
|||
use tvix_store::proto::GRPCBlobServiceWrapper;
|
||||
use tvix_store::proto::GRPCDirectoryServiceWrapper;
|
||||
use tvix_store::proto::GRPCPathInfoServiceWrapper;
|
||||
use tvix_store::TvixStoreIO;
|
||||
|
||||
#[cfg(feature = "reflection")]
|
||||
use tvix_store::proto::FILE_DESCRIPTOR_SET;
|
||||
|
@ -85,8 +79,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber");
|
||||
|
||||
// initialize stores
|
||||
let mut blob_service = SledBlobService::new("blobs.sled".into())?;
|
||||
let mut directory_service = SledDirectoryService::new("directories.sled".into())?;
|
||||
let blob_service = SledBlobService::new("blobs.sled".into())?;
|
||||
let directory_service = SledDirectoryService::new("directories.sled".into())?;
|
||||
let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?;
|
||||
|
||||
match cli.command {
|
||||
|
@ -134,37 +128,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
directory_service.clone(),
|
||||
);
|
||||
|
||||
let mut io = TvixStoreIO::new(
|
||||
blob_service,
|
||||
directory_service,
|
||||
path_info_service,
|
||||
nar_calculation_service,
|
||||
);
|
||||
|
||||
for path in paths {
|
||||
let root_node = ingest_path(&mut blob_service, &mut directory_service, &path)?;
|
||||
let path_info = io
|
||||
.import_path_with_pathinfo(&path)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
|
||||
|
||||
let nar_hash = NixHashWithMode::Recursive(NixHash::new(
|
||||
HashAlgo::Sha256,
|
||||
nar_calculation_service
|
||||
.calculate_nar(&root_node)?
|
||||
.1
|
||||
.to_vec(),
|
||||
));
|
||||
|
||||
let mut drv = Derivation::default();
|
||||
drv.outputs.insert(
|
||||
"out".to_string(),
|
||||
Output {
|
||||
path: "".to_string(),
|
||||
hash_with_mode: Some(nar_hash),
|
||||
},
|
||||
);
|
||||
drv.calculate_output_paths(
|
||||
path.file_name()
|
||||
.expect("path must not be ..")
|
||||
.to_str()
|
||||
.expect("path must be valid unicode"),
|
||||
// Note the derivation_or_fod_hash argument is *unused* for FODs, so it doesn't matter what we pass here.
|
||||
&NixHash::new(HashAlgo::Sha256, vec![]),
|
||||
)?;
|
||||
|
||||
println!("{}", drv.outputs.get("out").unwrap().path);
|
||||
|
||||
match root_node {
|
||||
match path_info.node.unwrap().node.unwrap() {
|
||||
tvix_store::proto::node::Node::Directory(directory_node) => {
|
||||
info!(
|
||||
path = ?path,
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
mod digests;
|
||||
mod errors;
|
||||
mod store_io;
|
||||
|
||||
pub mod blobservice;
|
||||
pub mod directoryservice;
|
||||
|
@ -10,6 +11,7 @@ pub mod proto;
|
|||
|
||||
pub use digests::B3Digest;
|
||||
pub use errors::Error;
|
||||
pub use store_io::TvixStoreIO;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
|
363
tvix/store/src/store_io.rs
Normal file
363
tvix/store/src/store_io.rs
Normal file
|
@ -0,0 +1,363 @@
|
|||
//! This module provides an implementation of EvalIO.
|
||||
//!
|
||||
//! It can be used by the tvix evalutator to talk to a tvix store.
|
||||
|
||||
use data_encoding::BASE64;
|
||||
use nix_compat::{
|
||||
nixhash::{HashAlgo, NixHash, NixHashWithMode},
|
||||
store_path::{build_regular_ca_path, StorePath},
|
||||
};
|
||||
use smol_str::SmolStr;
|
||||
use std::{io, path::Path, path::PathBuf};
|
||||
use tracing::{error, instrument, warn};
|
||||
use tvix_eval::{EvalIO, FileType, StdIO};
|
||||
|
||||
use crate::{
|
||||
blobservice::BlobService,
|
||||
directoryservice::{self, DirectoryService},
|
||||
import,
|
||||
nar::NARCalculationService,
|
||||
pathinfoservice::PathInfoService,
|
||||
proto::NamedNode,
|
||||
B3Digest,
|
||||
};
|
||||
|
||||
/// Implements [EvalIO], asking given [PathInfoService], [DirectoryService]
|
||||
/// and [BlobService].
|
||||
///
|
||||
/// In case the given path does not exist in these stores, we ask StdIO.
|
||||
/// This is to both cover cases of syntactically valid store paths, that exist
|
||||
/// on the filesystem (still managed by Nix), as well as being able to read
|
||||
/// files outside store paths.
|
||||
pub struct TvixStoreIO<
|
||||
BS: BlobService,
|
||||
DS: DirectoryService,
|
||||
PS: PathInfoService,
|
||||
NCS: NARCalculationService,
|
||||
> {
|
||||
blob_service: BS,
|
||||
directory_service: DS,
|
||||
path_info_service: PS,
|
||||
nar_calculation_service: NCS,
|
||||
std_io: StdIO,
|
||||
}
|
||||
|
||||
impl<BS: BlobService, DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService>
|
||||
TvixStoreIO<BS, DS, PS, NCS>
|
||||
{
|
||||
pub fn new(
|
||||
blob_service: BS,
|
||||
directory_service: DS,
|
||||
path_info_service: PS,
|
||||
nar_calculation_service: NCS,
|
||||
) -> Self {
|
||||
Self {
|
||||
blob_service,
|
||||
directory_service,
|
||||
path_info_service,
|
||||
nar_calculation_service,
|
||||
std_io: StdIO {},
|
||||
}
|
||||
}
|
||||
|
||||
/// for a given [StorePath] and additional [Path] inside the store path,
|
||||
/// look up the [PathInfo], and if it exists, traverse the directory structure to
|
||||
/// return the [crate::proto::node::Node] specified by `sub_path`.
|
||||
#[instrument(skip(self), ret, err)]
|
||||
fn store_path_to_root_node(
|
||||
&mut self,
|
||||
store_path: &StorePath,
|
||||
sub_path: &Path,
|
||||
) -> Result<Option<crate::proto::node::Node>, crate::Error> {
|
||||
let path_info = {
|
||||
match self.path_info_service.get(store_path.digest)? {
|
||||
// If there's no PathInfo found, early exit
|
||||
None => return Ok(None),
|
||||
Some(path_info) => path_info,
|
||||
}
|
||||
};
|
||||
|
||||
let root_node = {
|
||||
match path_info.node {
|
||||
None => {
|
||||
warn!(
|
||||
"returned PathInfo {:?} node is None, this shouldn't happen.",
|
||||
&path_info
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
Some(root_node) => match root_node.node {
|
||||
None => {
|
||||
warn!("node for {:?} is None, this shouldn't happen.", &root_node);
|
||||
return Ok(None);
|
||||
}
|
||||
Some(root_node) => root_node,
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
directoryservice::traverse_to(&mut self.directory_service, root_node, sub_path)
|
||||
}
|
||||
|
||||
/// Imports a given path on the filesystem into the store, and returns the
|
||||
/// [crate::proto::PathInfo] describing the path, that was sent to
|
||||
/// [PathInfoService].
|
||||
/// While not part of the [EvalIO], it's still useful for clients who
|
||||
/// care about the [PathInfo].
|
||||
#[instrument(skip(self), ret, err)]
|
||||
pub fn import_path_with_pathinfo(
|
||||
&mut self,
|
||||
path: &std::path::Path,
|
||||
) -> Result<crate::proto::PathInfo, io::Error> {
|
||||
// Call [import::ingest_path], which will walk over the given path and return a root_node.
|
||||
let root_node =
|
||||
import::ingest_path(&mut self.blob_service, &mut self.directory_service, path)
|
||||
.expect("error during import_path");
|
||||
|
||||
// Render the NAR
|
||||
let (nar_size, nar_sha256) = self
|
||||
.nar_calculation_service
|
||||
.calculate_nar(&root_node)
|
||||
.expect("error during nar calculation"); // TODO: handle error
|
||||
|
||||
// For given NAR sha256 digest and name, return the new [StorePath] this would have.
|
||||
let nar_hash_with_mode =
|
||||
NixHashWithMode::Recursive(NixHash::new(HashAlgo::Sha256, nar_sha256.to_vec()));
|
||||
|
||||
let name = path
|
||||
.file_name()
|
||||
.expect("path must not be ..")
|
||||
.to_str()
|
||||
.expect("path must be valid unicode");
|
||||
|
||||
let output_path =
|
||||
build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap();
|
||||
|
||||
// assemble a new root_node with a name that is derived from the nar hash.
|
||||
let renamed_root_node = {
|
||||
let name = output_path.to_string();
|
||||
|
||||
match root_node {
|
||||
crate::proto::node::Node::Directory(n) => {
|
||||
crate::proto::node::Node::Directory(crate::proto::DirectoryNode { name, ..n })
|
||||
}
|
||||
crate::proto::node::Node::File(n) => {
|
||||
crate::proto::node::Node::File(crate::proto::FileNode { name, ..n })
|
||||
}
|
||||
crate::proto::node::Node::Symlink(n) => {
|
||||
crate::proto::node::Node::Symlink(crate::proto::SymlinkNode { name, ..n })
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// assemble the [crate::proto::PathInfo] object.
|
||||
let path_info = crate::proto::PathInfo {
|
||||
node: Some(crate::proto::Node {
|
||||
node: Some(renamed_root_node),
|
||||
}),
|
||||
// There's no reference scanning on path contents ingested like this.
|
||||
references: vec![],
|
||||
narinfo: Some(crate::proto::NarInfo {
|
||||
nar_size,
|
||||
nar_sha256: nar_sha256.to_vec(),
|
||||
signatures: vec![],
|
||||
reference_names: vec![],
|
||||
// TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6`
|
||||
// do we need this anywhere?
|
||||
}),
|
||||
};
|
||||
|
||||
// put into [PathInfoService], and return the PathInfo that we get back
|
||||
// from there (it might contain additional signatures).
|
||||
let path_info = self.path_info_service.put(path_info)?;
|
||||
|
||||
Ok(path_info)
|
||||
}
|
||||
}
|
||||
|
||||
/// For given NAR sha256 digest and name, return the new [StorePath] this would have.
|
||||
#[instrument(skip(nar_sha256_digest), ret, fields(nar_sha256_digest=BASE64.encode(nar_sha256_digest)))]
|
||||
fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> StorePath {
|
||||
let nar_hash_with_mode =
|
||||
NixHashWithMode::Recursive(NixHash::new(HashAlgo::Sha256, nar_sha256_digest.to_vec()));
|
||||
|
||||
build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap()
|
||||
}
|
||||
|
||||
impl<
|
||||
BS: BlobService + Clone,
|
||||
DS: DirectoryService + Clone,
|
||||
PS: PathInfoService,
|
||||
NCS: NARCalculationService,
|
||||
> EvalIO for TvixStoreIO<BS, DS, PS, NCS>
|
||||
{
|
||||
#[instrument(skip(self), ret, err)]
|
||||
fn path_exists(&mut self, path: &Path) -> Result<bool, io::Error> {
|
||||
if let Ok((store_path, sub_path)) =
|
||||
StorePath::from_absolute_path_full(&path.to_string_lossy())
|
||||
{
|
||||
if self
|
||||
.store_path_to_root_node(&store_path, &sub_path)?
|
||||
.is_some()
|
||||
{
|
||||
Ok(true)
|
||||
} else {
|
||||
// As tvix-store doesn't manage /nix/store on the filesystem,
|
||||
// we still need to also ask self.std_io here.
|
||||
self.std_io.path_exists(path)
|
||||
}
|
||||
} else {
|
||||
// The store path is no store path, so do regular StdIO.
|
||||
self.std_io.path_exists(path)
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self), ret, err)]
|
||||
fn read_to_string(&mut self, path: &Path) -> Result<String, io::Error> {
|
||||
if let Ok((store_path, sub_path)) =
|
||||
StorePath::from_absolute_path_full(&path.to_string_lossy())
|
||||
{
|
||||
if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? {
|
||||
// depending on the node type, treat read_to_string differently
|
||||
match node {
|
||||
crate::proto::node::Node::Directory(_) => {
|
||||
// This would normally be a io::ErrorKind::IsADirectory (still unstable)
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
"tried to read directory at {path} to string",
|
||||
))
|
||||
}
|
||||
crate::proto::node::Node::File(file_node) => {
|
||||
let digest =
|
||||
B3Digest::from_vec(file_node.digest.clone()).map_err(|_e| {
|
||||
error!(
|
||||
file_node = ?file_node,
|
||||
"invalid digest"
|
||||
);
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("invalid digest length in file node: {:?}", file_node),
|
||||
)
|
||||
})?;
|
||||
|
||||
let reader = {
|
||||
let resp = self.blob_service.open_read(&digest)?;
|
||||
match resp {
|
||||
Some(blob_reader) => blob_reader,
|
||||
None => {
|
||||
error!(
|
||||
blob.digest = %digest,
|
||||
"blob not found",
|
||||
);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::NotFound,
|
||||
format!("blob {} not found", &digest),
|
||||
))?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
io::read_to_string(reader)
|
||||
}
|
||||
crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
"read_to_string for symlinks is unsupported",
|
||||
))?,
|
||||
}
|
||||
} else {
|
||||
// As tvix-store doesn't manage /nix/store on the filesystem,
|
||||
// we still need to also ask self.std_io here.
|
||||
self.std_io.read_to_string(path)
|
||||
}
|
||||
} else {
|
||||
// The store path is no store path, so do regular StdIO.
|
||||
self.std_io.read_to_string(path)
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self), ret, err)]
|
||||
fn read_dir(&mut self, path: &Path) -> Result<Vec<(SmolStr, FileType)>, io::Error> {
|
||||
if let Ok((store_path, sub_path)) =
|
||||
StorePath::from_absolute_path_full(&path.to_string_lossy())
|
||||
{
|
||||
if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? {
|
||||
match node {
|
||||
crate::proto::node::Node::Directory(directory_node) => {
|
||||
// fetch the Directory itself.
|
||||
let digest =
|
||||
B3Digest::from_vec(directory_node.digest.clone()).map_err(|_e| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"invalid digest length in directory node: {:?}",
|
||||
directory_node
|
||||
),
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(directory) = self.directory_service.get(&digest)? {
|
||||
let mut children: Vec<(SmolStr, FileType)> = Vec::new();
|
||||
for node in directory.nodes() {
|
||||
children.push(match node {
|
||||
crate::proto::node::Node::Directory(e) => {
|
||||
(e.name.into(), FileType::Directory)
|
||||
}
|
||||
crate::proto::node::Node::File(e) => {
|
||||
(e.name.into(), FileType::Regular)
|
||||
}
|
||||
crate::proto::node::Node::Symlink(e) => {
|
||||
(e.name.into(), FileType::Symlink)
|
||||
}
|
||||
})
|
||||
}
|
||||
Ok(children)
|
||||
} else {
|
||||
// If we didn't get the directory node that's linked, that's a store inconsistency!
|
||||
error!(
|
||||
directory.digest = %digest,
|
||||
path = ?path,
|
||||
"directory not found",
|
||||
);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::NotFound,
|
||||
format!("directory {digest} does not exist"),
|
||||
))?
|
||||
}
|
||||
}
|
||||
crate::proto::node::Node::File(_file_node) => {
|
||||
// This would normally be a io::ErrorKind::NotADirectory (still unstable)
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
"tried to readdir path {:?}, which is a file",
|
||||
))?
|
||||
}
|
||||
crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new(
|
||||
io::ErrorKind::Unsupported,
|
||||
"read_dir for symlinks is unsupported",
|
||||
))?,
|
||||
}
|
||||
} else {
|
||||
self.std_io.read_dir(path)
|
||||
}
|
||||
} else {
|
||||
self.std_io.read_dir(path)
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self), ret, err)]
|
||||
fn import_path(&mut self, path: &std::path::Path) -> Result<PathBuf, std::io::Error> {
|
||||
let path_info = self.import_path_with_pathinfo(path)?;
|
||||
|
||||
// from the [PathInfo], extract the store path (as string).
|
||||
let mut path = PathBuf::from(nix_compat::store_path::STORE_DIR_WITH_SLASH);
|
||||
path.push(path_info.node.unwrap().node.unwrap().get_name());
|
||||
|
||||
// and return it
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
#[instrument(skip(self), ret)]
|
||||
fn store_dir(&self) -> Option<String> {
|
||||
Some("/nix/store".to_string())
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue