refactor(tvix/store): drop calculate_nar from PathInfoService

This shouldn't be part of the PathInfoService trait.

Pretty much none of the PathInfoServices do implement it, and requiring
them to implement it means they also cannot make use of this calculation
already being done by other PathInfoServices.

Move it out into its own NarCalculationService trait, defined somewhere
at tvix_store::nar, and have everyone who wants to trigger nar
calculation use nar_calculation_service directly, which now is an
additional field in TvixStoreIO for example.

It being moved outside the PathInfoService trait doesn't prohibit
specific implementations to implement it (like the GRPC client for the
`PathInfoService` does.

This is currently wired together in a bit of a hacky fashion - as of
now, everything uses the naive implementation that traverses blob and
directoryservice, rather than composing it properly. I want to leave
that up to a later CL, dealing with other parts of store composition
too.

Change-Id: I18d07ea4301d4a07651b8218bc5fe95e4e307208
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11619
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-05-10 08:59:25 +03:00 committed by clbot
parent 944a781354
commit 14766cfe1d
20 changed files with 241 additions and 187 deletions

View file

@ -81,21 +81,22 @@ struct Args {
}
fn init_io_handle(tokio_runtime: &tokio::runtime::Runtime, args: &Args) -> Rc<TvixStoreIO> {
let (blob_service, directory_service, path_info_service) = tokio_runtime
.block_on({
let blob_service_addr = args.blob_service_addr.clone();
let directory_service_addr = args.directory_service_addr.clone();
let path_info_service_addr = args.path_info_service_addr.clone();
async move {
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await
}
})
.expect("unable to setup {blob|directory|pathinfo}service before interpreter setup");
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tokio_runtime
.block_on({
let blob_service_addr = args.blob_service_addr.clone();
let directory_service_addr = args.directory_service_addr.clone();
let path_info_service_addr = args.path_info_service_addr.clone();
async move {
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await
}
})
.expect("unable to setup {blob|directory|pathinfo}service before interpreter setup");
let build_service = tokio_runtime
.block_on({
@ -116,6 +117,7 @@ fn init_io_handle(tokio_runtime: &tokio::runtime::Runtime, args: &Args) -> Rc<Tv
blob_service.clone(),
directory_service.clone(),
path_info_service.into(),
nar_calculation_service.into(),
build_service.into(),
tokio_runtime.handle().clone(),
))

View file

@ -2,10 +2,6 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use lazy_static::lazy_static;
use std::{env, rc::Rc, sync::Arc, time::Duration};
use tvix_build::buildservice::DummyBuildService;
use tvix_castore::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
};
use tvix_eval::{builtins::impure_builtins, EvalIO};
use tvix_glue::{
builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins},
@ -13,16 +9,9 @@ use tvix_glue::{
tvix_io::TvixIO,
tvix_store_io::TvixStoreIO,
};
use tvix_store::pathinfoservice::{MemoryPathInfoService, PathInfoService};
use tvix_store::utils::construct_services;
lazy_static! {
static ref BLOB_SERVICE: Arc<dyn BlobService> = Arc::new(MemoryBlobService::default());
static ref DIRECTORY_SERVICE: Arc<dyn DirectoryService> =
Arc::new(MemoryDirectoryService::default());
static ref PATH_INFO_SERVICE: Arc<dyn PathInfoService> = Arc::new(MemoryPathInfoService::new(
BLOB_SERVICE.clone(),
DIRECTORY_SERVICE.clone(),
));
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Runtime::new().unwrap();
}
@ -30,12 +19,17 @@ fn interpret(code: &str) {
// TODO: this is a bit annoying.
// It'd be nice if we could set this up once and then run evaluate() with a
// piece of code. b/262
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
TOKIO_RUNTIME
.block_on(async { construct_services("memory://", "memory://", "memory://").await })
.unwrap();
// We assemble a complete store in memory.
let tvix_store_io = Rc::new(TvixStoreIO::new(
BLOB_SERVICE.clone(),
DIRECTORY_SERVICE.clone(),
PATH_INFO_SERVICE.clone(),
blob_service,
directory_service,
path_info_service.into(),
nar_calculation_service.into(),
Arc::<DummyBuildService>::default(),
TOKIO_RUNTIME.handle().clone(),
));

View file

@ -178,7 +178,7 @@ mod import_builtins {
CAHash::Nar(NixHash::Sha256(state.tokio_handle.block_on(async {
Ok::<_, tvix_eval::ErrorKind>(
state
.path_info_service
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await
@ -255,7 +255,7 @@ mod import_builtins {
.tokio_handle
.block_on(async {
let (_, nar_sha256) = state
.path_info_service
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await?;

View file

@ -68,7 +68,7 @@ mod tests {
fn eval(str: &str) -> EvaluationResult {
// We assemble a complete store in memory.
let runtime = tokio::runtime::Runtime::new().expect("Failed to build a Tokio runtime");
let (blob_service, directory_service, path_info_service) = runtime
let (blob_service, directory_service, path_info_service, nar_calculation_service) = runtime
.block_on(async { construct_services("memory://", "memory://", "memory://").await })
.expect("Failed to construct store services in memory");
@ -76,6 +76,7 @@ mod tests {
blob_service,
directory_service,
path_info_service.into(),
nar_calculation_service.into(),
Arc::<DummyBuildService>::default(),
runtime.handle().clone(),
));

View file

@ -14,7 +14,7 @@ use tvix_castore::{
directoryservice::DirectoryService,
proto::{node::Node, FileNode},
};
use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
use tvix_store::{nar::NarCalculationService, pathinfoservice::PathInfoService, proto::PathInfo};
use url::Url;
use crate::builtins::FetcherError;
@ -106,20 +106,27 @@ impl Fetch {
}
/// Knows how to fetch a given [Fetch].
pub struct Fetcher<BS, DS, PS> {
pub struct Fetcher<BS, DS, PS, NS> {
http_client: reqwest::Client,
blob_service: BS,
directory_service: DS,
path_info_service: PS,
nar_calculation_service: NS,
}
impl<BS, DS, PS> Fetcher<BS, DS, PS> {
pub fn new(blob_service: BS, directory_service: DS, path_info_service: PS) -> Self {
impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
pub fn new(
blob_service: BS,
directory_service: DS,
path_info_service: PS,
nar_calculation_service: NS,
) -> Self {
Self {
http_client: reqwest::Client::new(),
blob_service,
directory_service,
path_info_service,
nar_calculation_service,
}
}
@ -170,11 +177,12 @@ async fn hash<D: Digest + std::io::Write>(
Ok((hasher.finalize(), bytes_copied))
}
impl<BS, DS, PS> Fetcher<BS, DS, PS>
impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS>
where
BS: BlobService + Clone + 'static,
DS: DirectoryService + Clone,
PS: PathInfoService,
NS: NarCalculationService,
{
/// Ingest the data from a specified [Fetch].
/// On success, return the root node, a content digest and length.
@ -257,7 +265,7 @@ where
// Even if no expected NAR sha256 has been provided, we need
// the actual one later.
let (nar_size, actual_nar_sha256) = self
.path_info_service
.nar_calculation_service
.calculate_nar(&node)
.await
.map_err(|e| {
@ -309,7 +317,7 @@ where
// the [PathInfo].
let (nar_size, nar_sha256) = match &ca_hash {
CAHash::Flat(_nix_hash) => self
.path_info_service
.nar_calculation_service
.calculate_nar(&node)
.await
.map_err(|e| FetcherError::Io(e.into()))?,

View file

@ -3,12 +3,8 @@ use std::{rc::Rc, sync::Arc};
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use tvix_build::buildservice::DummyBuildService;
use tvix_castore::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
};
use tvix_eval::{EvalIO, Value};
use tvix_store::pathinfoservice::{MemoryPathInfoService, PathInfoService};
use tvix_store::utils::construct_services;
use rstest::rstest;
@ -36,19 +32,17 @@ fn eval_test(code_path: PathBuf, expect_success: bool) {
return;
}
let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
let directory_service =
Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>;
let path_info_service = Box::new(MemoryPathInfoService::new(
blob_service.clone(),
directory_service.clone(),
)) as Box<dyn PathInfoService>;
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tokio_runtime
.block_on(async { construct_services("memory://", "memory://", "memory://").await })
.unwrap();
let tvix_store_io = Rc::new(TvixStoreIO::new(
blob_service,
directory_service,
path_info_service.into(),
nar_calculation_service.into(),
Arc::new(DummyBuildService::default()),
tokio_runtime.handle().clone(),
));

View file

@ -18,6 +18,7 @@ use tracing::{error, info, instrument, warn, Level};
use tvix_build::buildservice::BuildService;
use tvix_castore::proto::node::Node;
use tvix_eval::{EvalIO, FileType, StdIO};
use tvix_store::nar::NarCalculationService;
use tvix_store::utils::AsyncIoBridge;
use tvix_castore::{
@ -52,13 +53,20 @@ pub struct TvixStoreIO {
pub(crate) blob_service: Arc<dyn BlobService>,
pub(crate) directory_service: Arc<dyn DirectoryService>,
pub(crate) path_info_service: Arc<dyn PathInfoService>,
pub(crate) nar_calculation_service: Arc<dyn NarCalculationService>,
std_io: StdIO,
#[allow(dead_code)]
build_service: Arc<dyn BuildService>,
pub(crate) tokio_handle: tokio::runtime::Handle,
pub(crate) fetcher:
Fetcher<Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>>,
#[allow(clippy::type_complexity)]
pub(crate) fetcher: Fetcher<
Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
Arc<dyn PathInfoService>,
Arc<dyn NarCalculationService>,
>,
// Paths known how to produce, by building or fetching.
pub(crate) known_paths: RefCell<KnownPaths>,
@ -69,6 +77,7 @@ impl TvixStoreIO {
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
path_info_service: Arc<dyn PathInfoService>,
nar_calculation_service: Arc<dyn NarCalculationService>,
build_service: Arc<dyn BuildService>,
tokio_handle: tokio::runtime::Handle,
) -> Self {
@ -76,10 +85,16 @@ impl TvixStoreIO {
blob_service: blob_service.clone(),
directory_service: directory_service.clone(),
path_info_service: path_info_service.clone(),
nar_calculation_service: nar_calculation_service.clone(),
std_io: StdIO {},
build_service,
tokio_handle,
fetcher: Fetcher::new(blob_service, directory_service, path_info_service),
fetcher: Fetcher::new(
blob_service,
directory_service,
path_info_service,
nar_calculation_service,
),
known_paths: Default::default(),
}
}
@ -247,8 +262,10 @@ impl TvixStoreIO {
let root_node = output.node.as_ref().expect("invalid root node");
// calculate the nar representation
let (nar_size, nar_sha256) =
self.path_info_service.calculate_nar(root_node).await?;
let (nar_size, nar_sha256) = self
.nar_calculation_service
.calculate_nar(root_node)
.await?;
// assemble the PathInfo to persist
let path_info = PathInfo {
@ -323,7 +340,7 @@ impl TvixStoreIO {
// because the path info construct a narinfo which *always*
// require a SHA256 of the NAR representation and the NAR size.
let (nar_size, nar_sha256) = self
.path_info_service
.nar_calculation_service
.as_ref()
.calculate_nar(&root_node)
.await?;
@ -564,6 +581,7 @@ impl EvalIO for TvixStoreIO {
&self.blob_service,
&self.directory_service,
&self.path_info_service,
&self.nar_calculation_service,
)
.await
})?;
@ -584,12 +602,8 @@ mod tests {
use bstr::ByteSlice;
use tempfile::TempDir;
use tvix_build::buildservice::DummyBuildService;
use tvix_castore::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
};
use tvix_eval::{EvalIO, EvaluationResult};
use tvix_store::pathinfoservice::MemoryPathInfoService;
use tvix_store::utils::construct_services;
use super::TvixStoreIO;
use crate::builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins};
@ -598,22 +612,19 @@ mod tests {
/// Takes care of setting up the evaluator so it knows about the
// `derivation` builtin.
fn eval(str: &str) -> EvaluationResult {
let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
let directory_service =
Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>;
let path_info_service = Arc::new(MemoryPathInfoService::new(
blob_service.clone(),
directory_service.clone(),
));
let runtime = tokio::runtime::Runtime::new().unwrap();
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tokio_runtime
.block_on(async { construct_services("memory://", "memory://", "memory://").await })
.unwrap();
let io = Rc::new(TvixStoreIO::new(
blob_service.clone(),
directory_service.clone(),
path_info_service,
blob_service,
directory_service,
path_info_service.into(),
nar_calculation_service.into(),
Arc::<DummyBuildService>::default(),
runtime.handle().clone(),
tokio_runtime.handle().clone(),
));
let mut eval = tvix_eval::Evaluation::new(io.clone() as Rc<dyn EvalIO>, true);

View file

@ -19,6 +19,7 @@ use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tvix_castore::import::fs::ingest_path;
use tvix_store::nar::NarCalculationService;
use tvix_store::proto::NarInfo;
use tvix_store::proto::PathInfo;
@ -286,7 +287,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
path_info_service_addr,
} => {
// initialize stores
let (blob_service, directory_service, path_info_service) =
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
@ -311,6 +312,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
))
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
Arc::from(path_info_service),
nar_calculation_service,
)));
#[cfg(feature = "tonic-reflection")]
@ -340,7 +342,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
path_info_service_addr,
} => {
// FUTUREWORK: allow flat for single files?
let (blob_service, directory_service, path_info_service) =
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
@ -348,8 +350,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)
.await?;
// Arc the PathInfoService, as we clone it .
// Arc PathInfoService and NarCalculationService, as we clone it .
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let nar_calculation_service: Arc<dyn NarCalculationService> =
nar_calculation_service.into();
let tasks = paths
.into_iter()
@ -358,6 +362,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let blob_service = blob_service.clone();
let directory_service = directory_service.clone();
let path_info_service = path_info_service.clone();
let nar_calculation_service = nar_calculation_service.clone();
async move {
if let Ok(name) = tvix_store::import::path_to_name(&path) {
@ -367,6 +372,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
blob_service,
directory_service,
path_info_service,
nar_calculation_service,
)
.await;
if let Ok(output_path) = resp {
@ -387,7 +393,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
path_info_service_addr,
reference_graph_path,
} => {
let (blob_service, directory_service, path_info_service) =
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
@ -494,7 +500,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
allow_other,
show_xattr,
} => {
let (blob_service, directory_service, path_info_service) =
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
@ -536,7 +542,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
list_root,
show_xattr,
} => {
let (blob_service, directory_service, path_info_service) =
let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,

View file

@ -11,6 +11,7 @@ use nix_compat::{
};
use crate::{
nar::NarCalculationService,
pathinfoservice::PathInfoService,
proto::{nar_info, NarInfo, PathInfo},
};
@ -104,25 +105,27 @@ pub fn derive_nar_ca_path_info(
/// Ingest the given path `path` and register the resulting output path in the
/// [`PathInfoService`] as a recursive fixed output NAR.
#[instrument(skip_all, fields(store_name=name, path=?path), err)]
pub async fn import_path_as_nar_ca<BS, DS, PS, P>(
pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>(
path: P,
name: &str,
blob_service: BS,
directory_service: DS,
path_info_service: PS,
nar_calculation_service: NS,
) -> Result<StorePath, std::io::Error>
where
P: AsRef<Path> + std::fmt::Debug,
BS: BlobService + Clone,
DS: DirectoryService,
PS: AsRef<dyn PathInfoService>,
NS: NarCalculationService,
{
let root_node = ingest_path(blob_service, directory_service, path.as_ref())
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
// Ask the PathInfoService for the NAR size and sha256
let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?;
// Ask for the NAR size and sha256
let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?;
// Calculate the output path. This might still fail, as some names are illegal.
// FUTUREWORK: express the `name` at the type level to be valid and move the conversion

View file

@ -1,3 +1,4 @@
use tonic::async_trait;
use tvix_castore::B3Digest;
mod import;
@ -5,6 +6,31 @@ mod renderer;
pub use import::ingest_nar;
pub use renderer::calculate_size_and_sha256;
pub use renderer::write_nar;
pub use renderer::SimpleRenderer;
use tvix_castore::proto as castorepb;
#[async_trait]
pub trait NarCalculationService: Send + Sync {
/// Return the nar size and nar sha256 digest for a given root node.
/// This can be used to calculate NAR-based output paths.
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), tvix_castore::Error>;
}
#[async_trait]
impl<A> NarCalculationService for A
where
A: AsRef<dyn NarCalculationService> + Send + Sync,
{
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
self.as_ref().calculate_nar(root_node).await
}
}
/// Errors that can encounter while rendering NARs.
#[derive(Debug, thiserror::Error)]

View file

@ -1,16 +1,51 @@
use crate::utils::AsyncIoBridge;
use super::RenderError;
use super::{NarCalculationService, RenderError};
use count_write::CountWrite;
use nix_compat::nar::writer::r#async as nar_writer;
use sha2::{Digest, Sha256};
use tokio::io::{self, AsyncWrite, BufReader};
use tonic::async_trait;
use tvix_castore::{
blobservice::BlobService,
directoryservice::DirectoryService,
proto::{self as castorepb, NamedNode},
};
pub struct SimpleRenderer<BS, DS> {
blob_service: BS,
directory_service: DS,
}
impl<BS, DS> SimpleRenderer<BS, DS> {
pub fn new(blob_service: BS, directory_service: DS) -> Self {
Self {
blob_service,
directory_service,
}
}
}
#[async_trait]
impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS>
where
BS: BlobService + Clone,
DS: DirectoryService + Clone,
{
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
calculate_size_and_sha256(
root_node,
self.blob_service.clone(),
self.directory_service.clone(),
)
.await
.map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e)))
}
}
/// Invoke [write_nar], and return the size and sha256 digest of the produced
/// NAR output.
pub async fn calculate_size_and_sha256<BS, DS>(

View file

@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use tonic::async_trait;
use tracing::trace;
use tvix_castore::proto as castorepb;
use tvix_castore::Error;
/// There should not be more than 10 MiB in a single cell.
@ -330,13 +329,6 @@ impl PathInfoService for BigtablePathInfoService {
Ok(path_info)
}
async fn calculate_nar(
&self,
_root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
return Err(Error::StorageError("unimplemented".into()));
}
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
let mut client = self.client.clone();

View file

@ -1,5 +1,8 @@
use super::PathInfoService;
use crate::proto::{self, ListPathInfoRequest, PathInfo};
use crate::{
nar::NarCalculationService,
proto::{self, ListPathInfoRequest, PathInfo},
};
use async_stream::try_stream;
use data_encoding::BASE64;
use futures::stream::BoxStream;
@ -67,30 +70,6 @@ impl PathInfoService for GRPCPathInfoService {
Ok(path_info)
}
#[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
let path_info = self
.grpc_client
.clone()
.calculate_nar(castorepb::Node {
node: Some(root_node.clone()),
})
.await
.map_err(|e| Error::StorageError(e.to_string()))?
.into_inner();
let nar_sha256: [u8; 32] = path_info
.nar_sha256
.to_vec()
.try_into()
.map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
Ok((path_info.nar_size, nar_sha256))
}
#[instrument(level = "trace", skip_all)]
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
let mut grpc_client = self.grpc_client.clone();
@ -126,6 +105,33 @@ impl PathInfoService for GRPCPathInfoService {
}
}
#[async_trait]
impl NarCalculationService for GRPCPathInfoService {
#[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
let path_info = self
.grpc_client
.clone()
.calculate_nar(castorepb::Node {
node: Some(root_node.clone()),
})
.await
.map_err(|e| Error::StorageError(e.to_string()))?
.into_inner();
let nar_sha256: [u8; 32] = path_info
.nar_sha256
.to_vec()
.try_into()
.map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
Ok((path_info.nar_size, nar_sha256))
}
}
#[cfg(test)]
mod tests {
use crate::pathinfoservice::tests::make_grpc_path_info_service_client;

View file

@ -1,19 +1,20 @@
use super::PathInfoService;
use crate::{nar::calculate_size_and_sha256, proto::PathInfo};
use crate::proto::PathInfo;
use futures::stream::{iter, BoxStream};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use tonic::async_trait;
use tvix_castore::proto as castorepb;
use tvix_castore::Error;
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
pub struct MemoryPathInfoService<BS, DS> {
db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>,
#[allow(dead_code)]
blob_service: BS,
#[allow(dead_code)]
directory_service: DS,
}
@ -61,15 +62,6 @@ where
}
}
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
.await
.map_err(|e| Error::StorageError(e.to_string()))
}
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
let db = self.db.read().unwrap();

View file

@ -12,7 +12,6 @@ mod tests;
use futures::stream::BoxStream;
use tonic::async_trait;
use tvix_castore::proto as castorepb;
use tvix_castore::Error;
use crate::proto::PathInfo;
@ -41,14 +40,6 @@ pub trait PathInfoService: Send + Sync {
/// invalid messages.
async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>;
/// Return the nar size and nar sha256 digest for a given root node.
/// This can be used to calculate NAR-based output paths,
/// and implementations are encouraged to cache it.
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error>;
/// Iterate over all PathInfo objects in the store.
/// Implementations can decide to disallow listing.
///
@ -72,13 +63,6 @@ where
self.as_ref().put(path_info).await
}
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
self.as_ref().calculate_nar(root_node).await
}
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
self.as_ref().list()
}

View file

@ -33,8 +33,7 @@ use super::PathInfoService;
///
/// The client is expected to be (indirectly) using the same [BlobService] and
/// [DirectoryService], so able to fetch referred Directories and Blobs.
/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not
/// implemented and return an error if called.
/// [PathInfoService::put] is not implemented and returns an error if called.
/// TODO: what about reading from nix-cache-info?
pub struct NixHTTPPathInfoService<BS, DS> {
base_url: url::Url,
@ -258,16 +257,6 @@ where
))
}
#[instrument(skip_all, fields(root_node=?root_node))]
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
Err(Error::InvalidRequest(
"calculate_nar not supported for this backend".to_string(),
))
}
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
Box::pin(futures::stream::once(async {
Err(Error::InvalidRequest(

View file

@ -1,5 +1,4 @@
use super::PathInfoService;
use crate::nar::calculate_size_and_sha256;
use crate::proto::PathInfo;
use async_stream::try_stream;
use data_encoding::BASE64;
@ -9,7 +8,6 @@ use std::path::Path;
use tonic::async_trait;
use tracing::instrument;
use tracing::warn;
use tvix_castore::proto as castorepb;
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
@ -19,7 +17,9 @@ use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService,
pub struct SledPathInfoService<BS, DS> {
db: sled::Db,
#[allow(dead_code)]
blob_service: BS,
#[allow(dead_code)]
directory_service: DS,
}
@ -109,16 +109,6 @@ where
Ok(path_info)
}
#[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
async fn calculate_nar(
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
.await
.map_err(|e| Error::StorageError(e.to_string()))
}
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
let db = self.db.clone();
let mut it = db.iter().values();

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use tonic::transport::{Endpoint, Server, Uri};
use crate::{
nar::{NarCalculationService, SimpleRenderer},
pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService},
proto::{
path_info_service_client::PathInfoServiceClient,
@ -25,13 +26,17 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS {
let blob_service = blob_service.clone();
let directory_service = directory_service.clone();
async move {
let path_info_service: Arc<dyn PathInfoService> =
Arc::from(MemoryPathInfoService::new(blob_service, directory_service));
let path_info_service: Arc<dyn PathInfoService> = Arc::from(
MemoryPathInfoService::new(blob_service.clone(), directory_service.clone()),
);
let nar_calculation_service =
Box::new(SimpleRenderer::new(blob_service, directory_service))
as Box<dyn NarCalculationService>;
// spin up a new DirectoryService
// spin up a new PathInfoService
let mut server = Server::builder();
let router = server.add_service(PathInfoServiceServer::new(
GRPCPathInfoServiceWrapper::new(path_info_service),
GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service),
));
router

View file

@ -1,4 +1,4 @@
use crate::nar::RenderError;
use crate::nar::{NarCalculationService, RenderError};
use crate::pathinfoservice::PathInfoService;
use crate::proto;
use futures::{stream::BoxStream, TryStreamExt};
@ -7,23 +7,26 @@ use tonic::{async_trait, Request, Response, Result, Status};
use tracing::{instrument, warn};
use tvix_castore::proto as castorepb;
pub struct GRPCPathInfoServiceWrapper<PS> {
inner: PS,
pub struct GRPCPathInfoServiceWrapper<PS, NS> {
path_info_service: PS,
// FUTUREWORK: allow exposing without allowing listing
nar_calculation_service: NS,
}
impl<PS> GRPCPathInfoServiceWrapper<PS> {
pub fn new(path_info_service: PS) -> Self {
impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> {
pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self {
Self {
inner: path_info_service,
path_info_service,
nar_calculation_service,
}
}
}
#[async_trait]
impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS>
impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS>
where
PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
NS: NarCalculationService + Send + Sync + 'static,
{
type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
@ -39,7 +42,7 @@ where
.to_vec()
.try_into()
.map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
match self.inner.get(digest).await {
match self.path_info_service.get(digest).await {
Ok(None) => Err(Status::not_found("PathInfo not found")),
Ok(Some(path_info)) => Ok(Response::new(path_info)),
Err(e) => {
@ -57,7 +60,7 @@ where
// Store the PathInfo in the client. Clients MUST validate the data
// they receive, so we don't validate additionally here.
match self.inner.put(path_info).await {
match self.path_info_service.put(path_info).await {
Ok(path_info_new) => Ok(Response::new(path_info_new)),
Err(e) => {
warn!(err = %e, "failed to put PathInfo");
@ -79,7 +82,7 @@ where
Err(Status::invalid_argument("invalid root node"))?
}
match self.inner.calculate_nar(&root_node).await {
match self.nar_calculation_service.calculate_nar(&root_node).await {
Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse {
nar_size,
nar_sha256: nar_sha256.to_vec().into(),
@ -99,7 +102,7 @@ where
_request: Request<proto::ListPathInfoRequest>,
) -> Result<Response<Self::ListStream>, Status> {
let stream = Box::pin(
self.inner
self.path_info_service
.list()
.map_err(|e| Status::internal(e.to_string())),
);

View file

@ -10,9 +10,10 @@ use tvix_castore::{
directoryservice::{self, DirectoryService},
};
use crate::nar::{NarCalculationService, SimpleRenderer};
use crate::pathinfoservice::{self, PathInfoService};
/// Construct the three store handles from their addrs.
/// Construct the store handles from their addrs.
pub async fn construct_services(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,
@ -21,6 +22,7 @@ pub async fn construct_services(
Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
Box<dyn PathInfoService>,
Box<dyn NarCalculationService>,
)> {
let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
.await?
@ -36,7 +38,18 @@ pub async fn construct_services(
)
.await?;
Ok((blob_service, directory_service, path_info_service))
// TODO: grpc client also implements NarCalculationService
let nar_calculation_service = Box::new(SimpleRenderer::new(
blob_service.clone(),
directory_service.clone(),
)) as Box<dyn NarCalculationService>;
Ok((
blob_service,
directory_service,
path_info_service,
nar_calculation_service,
))
}
/// The inverse of [tokio_util::io::SyncIoBridge].