diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs index daa8a5008..18a767284 100644 --- a/tvix/castore/src/composition.rs +++ b/tvix/castore/src/composition.rs @@ -454,11 +454,4 @@ impl Composition { *entry = Box::new(new_val); ret } - - pub fn context(&self) -> CompositionContext { - CompositionContext { - stack: vec![], - composition: Some(self), - } - } } diff --git a/tvix/cli/src/lib.rs b/tvix/cli/src/lib.rs index 070e88e35..800ffb4e0 100644 --- a/tvix/cli/src/lib.rs +++ b/tvix/cli/src/lib.rs @@ -60,7 +60,7 @@ pub fn init_io_handle(tokio_runtime: &tokio::runtime::Runtime, args: &Args) -> R Rc::new(TvixStoreIO::new( blob_service.clone(), directory_service.clone(), - path_info_service, + path_info_service.into(), nar_calculation_service.into(), build_service.into(), tokio_runtime.handle().clone(), diff --git a/tvix/glue/benches/eval.rs b/tvix/glue/benches/eval.rs index ee3d554dc..55f37f2d5 100644 --- a/tvix/glue/benches/eval.rs +++ b/tvix/glue/benches/eval.rs @@ -34,7 +34,7 @@ fn interpret(code: &str) { let tvix_store_io = Rc::new(TvixStoreIO::new( blob_service, directory_service, - path_info_service, + path_info_service.into(), nar_calculation_service.into(), Arc::::default(), TOKIO_RUNTIME.handle().clone(), diff --git a/tvix/glue/src/builtins/mod.rs b/tvix/glue/src/builtins/mod.rs index e1d1c9c84..e1c20e461 100644 --- a/tvix/glue/src/builtins/mod.rs +++ b/tvix/glue/src/builtins/mod.rs @@ -80,7 +80,7 @@ mod tests { let io = Rc::new(TvixStoreIO::new( blob_service, directory_service, - path_info_service, + path_info_service.into(), nar_calculation_service.into(), Arc::::default(), runtime.handle().clone(), diff --git a/tvix/glue/src/tests/mod.rs b/tvix/glue/src/tests/mod.rs index e9f21c329..142ae1a07 100644 --- a/tvix/glue/src/tests/mod.rs +++ b/tvix/glue/src/tests/mod.rs @@ -41,7 +41,7 @@ fn eval_test(code_path: PathBuf, expect_success: bool) { let tvix_store_io = Rc::new(TvixStoreIO::new( blob_service, directory_service, - path_info_service, + path_info_service.into(), nar_calculation_service.into(), Arc::new(DummyBuildService::default()), tokio_runtime.handle().clone(), diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index cfc037e8d..e8c09af0e 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -648,7 +648,7 @@ mod tests { let io = Rc::new(TvixStoreIO::new( blob_service, directory_service, - path_info_service, + path_info_service.into(), nar_calculation_service.into(), Arc::::default(), tokio_runtime.handle().clone(), diff --git a/tvix/nar-bridge/src/bin/nar-bridge.rs b/tvix/nar-bridge/src/bin/nar-bridge.rs index 1e4223775..b24f155a7 100644 --- a/tvix/nar-bridge/src/bin/nar-bridge.rs +++ b/tvix/nar-bridge/src/bin/nar-bridge.rs @@ -1,6 +1,7 @@ use clap::Parser; use nar_bridge::AppState; use tracing::info; +use tvix_store::pathinfoservice::{CachePathInfoService, PathInfoService}; /// Expose the Nix HTTP Binary Cache protocol for a tvix-store. #[derive(Parser)] @@ -15,6 +16,12 @@ struct Cli { #[arg(long, env, default_value = "grpc+http://[::1]:8000")] path_info_service_addr: String, + /// URL to a PathInfoService that's considered "remote". + /// If set, the other one is considered "local", and a "cache" for the + /// "remote" one. + #[arg(long, env)] + remote_path_info_service_addr: Option, + /// The priority to announce at the `nix-cache-info` endpoint. /// A lower number means it's *more preferred. #[arg(long, env, default_value_t = 39)] @@ -55,7 +62,25 @@ async fn main() -> Result<(), Box> { ) .await?; - let state = AppState::new(blob_service, directory_service, path_info_service); + // if remote_path_info_service_addr has been specified, + // update path_info_service to point to a cache combining the two. + let path_info_service = if let Some(addr) = cli.remote_path_info_service_addr { + let remote_path_info_service = tvix_store::pathinfoservice::from_addr( + &addr, + blob_service.clone(), + directory_service.clone(), + ) + .await?; + + let path_info_service = + CachePathInfoService::new(path_info_service, remote_path_info_service); + + Box::new(path_info_service) as Box + } else { + path_info_service + }; + + let state = AppState::new(blob_service, directory_service, path_info_service.into()); let app = nar_bridge::gen_router(cli.priority).with_state(state); diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 99323d2a5..82c73f8f4 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -16,6 +16,7 @@ use tracing::{info, info_span, instrument, Level, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt as _; use tvix_castore::import::fs::ingest_path; use tvix_store::nar::NarCalculationService; +use tvix_store::pathinfoservice::CachePathInfoService; use tvix_store::proto::NarInfo; use tvix_store::proto::PathInfo; @@ -209,38 +210,31 @@ async fn run_cli(cli: Cli) -> Result<(), Box { // initialize stores - let mut configs = tvix_store::utils::addrs_to_configs( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - )?; + let (blob_service, directory_service, path_info_service, nar_calculation_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; // if remote_path_info_service_addr has been specified, // update path_info_service to point to a cache combining the two. - if let Some(addr) = remote_path_info_service_addr { - use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG}; - use tvix_store::pathinfoservice::CachePathInfoServiceConfig; - - let remote_url = url::Url::parse(&addr)?; - let remote_config = with_registry(®, || remote_url.try_into())?; - - let local = configs.pathinfoservices.insert( - "default".into(), - DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig { - near: "local".into(), - far: "remote".into(), - })), - ); - configs - .pathinfoservices - .insert("local".into(), local.unwrap()); - configs - .pathinfoservices - .insert("remote".into(), remote_config); - } + let path_info_service = if let Some(addr) = remote_path_info_service_addr { + let remote_path_info_service = tvix_store::pathinfoservice::from_addr( + &addr, + blob_service.clone(), + directory_service.clone(), + ) + .await?; - let (blob_service, directory_service, path_info_service, nar_calculation_service) = - tvix_store::utils::construct_services_from_configs(configs).await?; + let path_info_service = + CachePathInfoService::new(path_info_service, remote_path_info_service); + + Box::new(path_info_service) as Box + } else { + path_info_service + }; let mut server = Server::builder().layer( ServiceBuilder::new() @@ -263,7 +257,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box Result<(), Box = path_info_service.into(); let nar_calculation_service: Arc = nar_calculation_service.into(); @@ -370,6 +365,9 @@ async fn run_cli(cli: Cli) -> Result<(), Box = serde_json::from_slice(reference_graph_json.as_slice())?; + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc = path_info_service.into(); + let lookups_span = info_span!( "lookup pathinfos", "indicatif.pb_show" = tracing::field::Empty @@ -477,7 +475,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box Result<(), Box, } +/// Represents configuration of [BigtablePathInfoService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option>")] + #[serde(default = "default_timeout")] + timeout: Option, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +impl BigtableParameters { + #[cfg(test)] + pub fn default_for_tests() -> Self { + Self { + project_id: "project-1".into(), + instance_name: "instance-1".into(), + is_read_only: false, + channel_size: default_channel_size(), + timeout: default_timeout(), + table_name: "table-1".into(), + family_name: "cf1".into(), + app_profile_id: default_app_profile_id(), + } + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option { + Some(std::time::Duration::from_secs(4)) +} + impl BigtablePathInfoService { #[cfg(not(test))] pub async fn connect(params: BigtableParameters) -> Result { @@ -363,88 +412,3 @@ impl PathInfoService for BigtablePathInfoService { Box::pin(stream) } } - -/// Represents configuration of [BigtablePathInfoService]. -/// This currently conflates both connect parameters and data model/client -/// behaviour parameters. -#[serde_as] -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct BigtableParameters { - project_id: String, - instance_name: String, - #[serde(default)] - is_read_only: bool, - #[serde(default = "default_channel_size")] - channel_size: usize, - - #[serde_as(as = "Option>")] - #[serde(default = "default_timeout")] - timeout: Option, - table_name: String, - family_name: String, - - #[serde(default = "default_app_profile_id")] - app_profile_id: String, -} - -impl BigtableParameters { - #[cfg(test)] - pub fn default_for_tests() -> Self { - Self { - project_id: "project-1".into(), - instance_name: "instance-1".into(), - is_read_only: false, - channel_size: default_channel_size(), - timeout: default_timeout(), - table_name: "table-1".into(), - family_name: "cf1".into(), - app_profile_id: default_app_profile_id(), - } - } -} - -fn default_app_profile_id() -> String { - "default".to_owned() -} - -fn default_channel_size() -> usize { - 4 -} - -fn default_timeout() -> Option { - Some(std::time::Duration::from_secs(4)) -} - -#[async_trait] -impl ServiceBuilder for BigtableParameters { - type Output = dyn PathInfoService; - async fn build<'a>( - &'a self, - _instance_name: &str, - _context: &CompositionContext, - ) -> Result, Box> { - Ok(Arc::new( - BigtablePathInfoService::connect(self.clone()).await?, - )) - } -} - -impl TryFrom for BigtableParameters { - type Error = Box; - fn try_from(mut url: url::Url) -> Result { - // parse the instance name from the hostname. - let instance_name = url - .host_str() - .ok_or_else(|| Error::StorageError("instance name missing".into()))? - .to_string(); - - // … but add it to the query string now, so we just need to parse that. - url.query_pairs_mut() - .append_pair("instance_name", &instance_name); - - let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) - .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; - - Ok(params) - } -} diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs index bb5595f72..664144ef4 100644 --- a/tvix/store/src/pathinfoservice/combinators.rs +++ b/tvix/store/src/pathinfoservice/combinators.rs @@ -1,11 +1,8 @@ -use std::sync::Arc; - use crate::proto::PathInfo; use futures::stream::BoxStream; use nix_compat::nixbase32; use tonic::async_trait; use tracing::{debug, instrument}; -use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; use super::PathInfoService; @@ -64,41 +61,6 @@ where } } -#[derive(serde::Deserialize)] -pub struct CacheConfig { - pub near: String, - pub far: String, -} - -impl TryFrom for CacheConfig { - type Error = Box; - fn try_from(_url: url::Url) -> Result { - Err(Error::StorageError( - "Instantiating a CombinedPathInfoService from a url is not supported".into(), - ) - .into()) - } -} - -#[async_trait] -impl ServiceBuilder for CacheConfig { - type Output = dyn PathInfoService; - async fn build<'a>( - &'a self, - _instance_name: &str, - context: &CompositionContext, - ) -> Result, Box> { - let (near, far) = futures::join!( - context.resolve(self.near.clone()), - context.resolve(self.far.clone()) - ); - Ok(Arc::new(Cache { - near: near?, - far: far?, - })) - } -} - #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 5635c226c..9173d25d0 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -1,10 +1,13 @@ -use super::PathInfoService; +use crate::proto::path_info_service_client::PathInfoServiceClient; -use crate::composition::{ - with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, +use super::{ + GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, + SledPathInfoService, }; + +use nix_compat::narinfo; use std::sync::Arc; -use tvix_castore::Error; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; use url::Url; /// Constructs a new instance of a [PathInfoService] from an URI. @@ -31,21 +34,113 @@ use url::Url; /// these also need to be passed in. pub async fn from_addr( uri: &str, - context: Option<&CompositionContext<'_>>, -) -> Result, Box> { + blob_service: Arc, + directory_service: Arc, +) -> Result, Error> { #[allow(unused_mut)] let mut url = Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; - let path_info_service_config = with_registry(®, || { - >>>::try_from( - url, - ) - })? - .0; - let path_info_service = path_info_service_config - .build("anonymous", context.unwrap_or(&CompositionContext::blank())) - .await?; + let path_info_service: Box = match url.scheme() { + "memory" => { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string())); + } + Box::::default() + } + "sled" => { + // sled doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string())); + } + + if url.path() == "/" { + return Err(Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )); + } + + // TODO: expose other parameters as URL parameters? + + Box::new(if url.path().is_empty() { + SledPathInfoService::new_temporary() + .map_err(|e| Error::StorageError(e.to_string()))? + } else { + SledPathInfoService::new(url.path()) + .map_err(|e| Error::StorageError(e.to_string()))? + }) + } + "nix+http" | "nix+https" => { + // Stringify the URL and remove the nix+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap(); + + let mut nix_http_path_info_service = + NixHTTPPathInfoService::new(new_url, blob_service, directory_service); + + let pairs = &url.query_pairs(); + for (k, v) in pairs.into_iter() { + if k == "trusted-public-keys" { + let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect(); + + let mut pubkeys: Vec = Vec::with_capacity(pubkey_strs.len()); + for pubkey_str in pubkey_strs { + pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| { + Error::StorageError(format!("invalid public key: {e}")) + })?); + } + + nix_http_path_info_service.set_public_keys(pubkeys); + } + } + + Box::new(nix_http_path_info_service) + } + scheme if scheme.starts_with("grpc+") => { + // schemes starting with grpc+ go to the GRPCPathInfoService. + // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Box::new(GRPCPathInfoService::from_client( + PathInfoServiceClient::with_interceptor( + tvix_castore::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) + } + #[cfg(feature = "cloud")] + "bigtable" => { + use super::bigtable::BigtableParameters; + use super::BigtablePathInfoService; + + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // … but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Box::new( + BigtablePathInfoService::connect(params) + .await + .map_err(|e| Error::StorageError(e.to_string()))?, + ) + } + _ => Err(Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))?, + }; Ok(path_info_service) } @@ -53,12 +148,14 @@ pub async fn from_addr( #[cfg(test)] mod tests { use super::from_addr; - use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder}; use lazy_static::lazy_static; use rstest::rstest; + use std::sync::Arc; use tempfile::TempDir; - use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig}; - use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig}; + use tvix_castore::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + }; lazy_static! { static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); @@ -127,19 +224,11 @@ mod tests { )] #[tokio::test] async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { - let mut comp = Composition::default(); - comp.extend(vec![( - "default".into(), - DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {}) - as Box>), - )]); - comp.extend(vec![( - "default".into(), - DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {}) - as Box>), - )]); - - let resp = from_addr(uri_str, Some(&comp.context())).await; + let blob_service: Arc = Arc::from(MemoryBlobService::default()); + let directory_service: Arc = + Arc::from(MemoryDirectoryService::default()); + + let resp = from_addr(uri_str, blob_service, directory_service).await; if exp_succeed { resp.expect("should succeed"); diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 2ac0e4330..bcee49aac 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -6,11 +6,9 @@ use crate::{ use async_stream::try_stream; use futures::stream::BoxStream; use nix_compat::nixbase32; -use std::sync::Arc; use tonic::{async_trait, Code}; use tracing::{instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; -use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::{proto as castorepb, Error}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. @@ -151,40 +149,6 @@ where } } -#[derive(serde::Deserialize, Debug)] -#[serde(deny_unknown_fields)] -pub struct GRPCPathInfoServiceConfig { - url: String, -} - -impl TryFrom for GRPCPathInfoServiceConfig { - type Error = Box; - fn try_from(url: url::Url) -> Result { - // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. - // - In the case of unix sockets, there must be a path, but may not be a host. - // - In the case of non-unix sockets, there must be a host, but no path. - // Constructing the channel is handled by tvix_castore::channel::from_url. - Ok(GRPCPathInfoServiceConfig { - url: url.to_string(), - }) - } -} - -#[async_trait] -impl ServiceBuilder for GRPCPathInfoServiceConfig { - type Output = dyn PathInfoService; - async fn build<'a>( - &'a self, - _instance_name: &str, - _context: &CompositionContext, - ) -> Result, Box> { - let client = proto::path_info_service_client::PathInfoServiceClient::new( - tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?, - ); - Ok(Arc::new(GRPCPathInfoService::from_client(client))) - } -} - #[cfg(test)] mod tests { use crate::pathinfoservice::tests::make_grpc_path_info_service_client; diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs index 39c592bc9..da674f497 100644 --- a/tvix/store/src/pathinfoservice/lru.rs +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -9,7 +9,6 @@ use tonic::async_trait; use tracing::instrument; use crate::proto::PathInfo; -use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; use super::PathInfoService; @@ -61,34 +60,6 @@ impl PathInfoService for LruPathInfoService { } } -#[derive(serde::Deserialize, Debug)] -#[serde(deny_unknown_fields)] -pub struct LruPathInfoServiceConfig { - capacity: NonZeroUsize, -} - -impl TryFrom for LruPathInfoServiceConfig { - type Error = Box; - fn try_from(_url: url::Url) -> Result { - Err(Error::StorageError( - "Instantiating a LruPathInfoService from a url is not supported".into(), - ) - .into()) - } -} - -#[async_trait] -impl ServiceBuilder for LruPathInfoServiceConfig { - type Output = dyn PathInfoService; - async fn build<'a>( - &'a self, - _instance_name: &str, - _context: &CompositionContext, - ) -> Result, Box> { - Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity))) - } -} - #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 3fabd239c..3de3221df 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -7,7 +7,6 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use tonic::async_trait; use tracing::instrument; -use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; #[derive(Default)] @@ -60,30 +59,3 @@ impl PathInfoService for MemoryPathInfoService { }) } } - -#[derive(serde::Deserialize, Debug)] -#[serde(deny_unknown_fields)] -pub struct MemoryPathInfoServiceConfig {} - -impl TryFrom for MemoryPathInfoServiceConfig { - type Error = Box; - fn try_from(url: url::Url) -> Result { - // memory doesn't support host or path in the URL. - if url.has_host() || !url.path().is_empty() { - return Err(Error::StorageError("invalid url".to_string()).into()); - } - Ok(MemoryPathInfoServiceConfig {}) - } -} - -#[async_trait] -impl ServiceBuilder for MemoryPathInfoServiceConfig { - type Output = dyn PathInfoService; - async fn build<'a>( - &'a self, - _instance_name: &str, - _context: &CompositionContext, - ) -> Result, Box> { - Ok(Arc::new(MemoryPathInfoService::default())) - } -} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 70f752f22..574bcc0b8 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -14,26 +14,22 @@ mod tests; use futures::stream::BoxStream; use tonic::async_trait; -use tvix_castore::composition::{Registry, ServiceBuilder}; use tvix_castore::Error; -use crate::nar::NarCalculationService; use crate::proto::PathInfo; -pub use self::combinators::{ - Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig, -}; +pub use self::combinators::Cache as CachePathInfoService; pub use self::from_addr::from_addr; -pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig}; -pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig}; -pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig}; -pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig}; -pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig}; +pub use self::grpc::GRPCPathInfoService; +pub use self::lru::LruPathInfoService; +pub use self::memory::MemoryPathInfoService; +pub use self::nix_http::NixHTTPPathInfoService; +pub use self::sled::SledPathInfoService; #[cfg(feature = "cloud")] mod bigtable; #[cfg(feature = "cloud")] -pub use self::bigtable::{BigtableParameters, BigtablePathInfoService}; +pub use self::bigtable::BigtablePathInfoService; #[cfg(any(feature = "fuse", feature = "virtiofs"))] pub use self::fs::make_fs; @@ -56,16 +52,12 @@ pub trait PathInfoService: Send + Sync { /// Rust doesn't support this as a generic in traits yet. This is the same thing that /// [async_trait] generates, but for streams instead of futures. fn list(&self) -> BoxStream<'static, Result>; - - fn nar_calculation_service(&self) -> Option> { - None - } } #[async_trait] impl PathInfoService for A where - A: AsRef + Send + Sync + 'static, + A: AsRef + Send + Sync, { async fn get(&self, digest: [u8; 20]) -> Result, Error> { self.as_ref().get(digest).await @@ -79,19 +71,3 @@ where self.as_ref().list() } } - -/// Registers the builtin PathInfoService implementations with the registry -pub(crate) fn register_pathinfo_services(reg: &mut Registry) { - reg.register::>, CachePathInfoServiceConfig>("cache"); - reg.register::>, GRPCPathInfoServiceConfig>("grpc"); - reg.register::>, LruPathInfoServiceConfig>("lru"); - reg.register::>, MemoryPathInfoServiceConfig>("memory"); - reg.register::>, NixHTTPPathInfoServiceConfig>("nix"); - reg.register::>, SledPathInfoServiceConfig>("sled"); - #[cfg(feature = "cloud")] - { - reg.register::>, BigtableParameters>( - "bigtable", - ); - } -} diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index af9234bc0..57fe37f44 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -7,15 +7,12 @@ use nix_compat::{ nixhash::NixHash, }; use reqwest::StatusCode; -use std::sync::Arc; use tokio::io::{self, AsyncRead}; use tonic::async_trait; use tracing::{debug, instrument, warn}; -use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, }; -use url::Url; /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix @@ -252,71 +249,3 @@ where })) } } - -#[derive(serde::Deserialize)] -pub struct NixHTTPPathInfoServiceConfig { - base_url: String, - blob_service: String, - directory_service: String, - #[serde(default)] - /// An optional list of [narinfo::PubKey]. - /// If set, the .narinfo files received need to have correct signature by at least one of these. - public_keys: Option>, -} - -impl TryFrom for NixHTTPPathInfoServiceConfig { - type Error = Box; - fn try_from(url: Url) -> Result { - let mut public_keys: Option> = None; - for (_, v) in url - .query_pairs() - .into_iter() - .filter(|(k, _)| k == "trusted-public-keys") - { - public_keys - .get_or_insert(Default::default()) - .extend(v.split_ascii_whitespace().map(ToString::to_string)); - } - Ok(NixHTTPPathInfoServiceConfig { - // Stringify the URL and remove the nix+ prefix. - // We can't use `url.set_scheme(rest)`, as it disallows - // setting something http(s) that previously wasn't. - base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(), - blob_service: "default".to_string(), - directory_service: "default".to_string(), - public_keys, - }) - } -} - -#[async_trait] -impl ServiceBuilder for NixHTTPPathInfoServiceConfig { - type Output = dyn PathInfoService; - async fn build<'a>( - &'a self, - _instance_name: &str, - context: &CompositionContext, - ) -> Result, Box> { - let (blob_service, directory_service) = futures::join!( - context.resolve(self.blob_service.clone()), - context.resolve(self.directory_service.clone()) - ); - let mut svc = NixHTTPPathInfoService::new( - Url::parse(&self.base_url)?, - blob_service?, - directory_service?, - ); - if let Some(public_keys) = &self.public_keys { - svc.set_public_keys( - public_keys - .iter() - .map(|pubkey_str| { - narinfo::PubKey::parse(pubkey_str) - .map_err(|e| Error::StorageError(format!("invalid public key: {e}"))) - }) - .collect::, Error>>()?, - ); - } - Ok(Arc::new(svc)) - } -} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 4255bfd1d..96ade1816 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -5,10 +5,8 @@ use futures::stream::BoxStream; use nix_compat::nixbase32; use prost::Message; use std::path::Path; -use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, warn}; -use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). @@ -116,69 +114,3 @@ impl PathInfoService for SledPathInfoService { }) } } - -#[derive(serde::Deserialize)] -#[serde(deny_unknown_fields)] -pub struct SledPathInfoServiceConfig { - is_temporary: bool, - #[serde(default)] - /// required when is_temporary = false - path: Option, -} - -impl TryFrom for SledPathInfoServiceConfig { - type Error = Box; - fn try_from(url: url::Url) -> Result { - // sled doesn't support host, and a path can be provided (otherwise - // it'll live in memory only). - if url.has_host() { - return Err(Error::StorageError("no host allowed".to_string()).into()); - } - - // TODO: expose compression and other parameters as URL parameters? - - Ok(if url.path().is_empty() { - SledPathInfoServiceConfig { - is_temporary: true, - path: None, - } - } else { - SledPathInfoServiceConfig { - is_temporary: false, - path: Some(url.path().to_string()), - } - }) - } -} - -#[async_trait] -impl ServiceBuilder for SledPathInfoServiceConfig { - type Output = dyn PathInfoService; - async fn build<'a>( - &'a self, - _instance_name: &str, - _context: &CompositionContext, - ) -> Result, Box> { - match self { - SledPathInfoServiceConfig { - is_temporary: true, - path: None, - } => Ok(Arc::new(SledPathInfoService::new_temporary()?)), - SledPathInfoServiceConfig { - is_temporary: true, - path: Some(_), - } => Err( - Error::StorageError("Temporary SledPathInfoService can not have path".into()) - .into(), - ), - SledPathInfoServiceConfig { - is_temporary: false, - path: None, - } => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()), - SledPathInfoServiceConfig { - is_temporary: false, - path: Some(path), - } => Ok(Arc::new(SledPathInfoService::new(path)?)), - } - } -} diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index a09786386..d82f2214f 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -1,60 +1,18 @@ +use std::sync::Arc; use std::{ - collections::HashMap, pin::Pin, - sync::Arc, task::{self, Poll}, }; use tokio::io::{self, AsyncWrite}; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; +use tvix_castore::{ + blobservice::{self, BlobService}, + directoryservice::{self, DirectoryService}, +}; use url::Url; -use crate::composition::{ - with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG, -}; use crate::nar::{NarCalculationService, SimpleRenderer}; -use crate::pathinfoservice::PathInfoService; - -#[derive(serde::Deserialize, Default)] -pub struct CompositionConfigs { - pub blobservices: - HashMap>>>, - pub directoryservices: HashMap< - String, - DeserializeWithRegistry>>, - >, - pub pathinfoservices: HashMap< - String, - DeserializeWithRegistry>>, - >, -} - -pub fn addrs_to_configs( - blob_service_addr: impl AsRef, - directory_service_addr: impl AsRef, - path_info_service_addr: impl AsRef, -) -> Result> { - let mut configs: CompositionConfigs = Default::default(); - - let blob_service_url = Url::parse(blob_service_addr.as_ref())?; - let directory_service_url = Url::parse(directory_service_addr.as_ref())?; - let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?; - - configs.blobservices.insert( - "default".into(), - with_registry(®, || blob_service_url.try_into())?, - ); - configs.directoryservices.insert( - "default".into(), - with_registry(®, || directory_service_url.try_into())?, - ); - configs.pathinfoservices.insert( - "default".into(), - with_registry(®, || path_info_service_url.try_into())?, - ); - - Ok(configs) -} +use crate::pathinfoservice::{self, PathInfoService}; /// Construct the store handles from their addrs. pub async fn construct_services( @@ -65,52 +23,49 @@ pub async fn construct_services( ( Arc, Arc, - Arc, + Box, Box, ), Box, > { - let configs = addrs_to_configs( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - )?; - construct_services_from_configs(configs).await -} - -/// Construct the store handles from their addrs. -pub async fn construct_services_from_configs( - configs: CompositionConfigs, -) -> Result< - ( - Arc, - Arc, - Arc, - Box, - ), - Box, -> { - let mut comp = Composition::default(); - - comp.extend(configs.blobservices); - comp.extend(configs.directoryservices); - comp.extend(configs.pathinfoservices); - - let blob_service: Arc = comp.build("default").await?; - let directory_service: Arc = comp.build("default").await?; - let path_info_service: Arc = comp.build("default").await?; + let blob_service: Arc = + blobservice::from_addr(blob_service_addr.as_ref()).await?; + let directory_service: Arc = + directoryservice::from_addr(directory_service_addr.as_ref()).await?; + + let path_info_service = pathinfoservice::from_addr( + path_info_service_addr.as_ref(), + blob_service.clone(), + directory_service.clone(), + ) + .await?; // HACK: The grpc client also implements NarCalculationService, and we // really want to use it (otherwise we'd need to fetch everything again for hashing). // Until we revamped store composition and config, detect this special case here. - let nar_calculation_service: Box = path_info_service - .nar_calculation_service() - .unwrap_or_else(|| { + let nar_calculation_service: Box = { + use crate::pathinfoservice::GRPCPathInfoService; + use crate::proto::path_info_service_client::PathInfoServiceClient; + + let url = Url::parse(path_info_service_addr.as_ref()) + .map_err(|e| io::Error::other(e.to_string()))?; + + if url.scheme().starts_with("grpc+") { + Box::new(GRPCPathInfoService::from_client( + PathInfoServiceClient::with_interceptor( + tvix_castore::tonic::channel_from_url(&url) + .await + .map_err(|e| io::Error::other(e.to_string()))?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) + } else { Box::new(SimpleRenderer::new( blob_service.clone(), directory_service.clone(), - )) - }); + )) as Box + } + }; Ok(( blob_service,