refactor(tvix/store): use composition in tvix_store crate

Change-Id: Ie6290b296baba2b987f1a61c9bb4c78549ac11f1
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11983
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: yuka <yuka@yuka.dev>
Tested-by: BuildkiteCI
This commit is contained in:
Yureka 2024-07-19 15:22:48 +02:00 committed by clbot
parent 6180a7cecf
commit 8b77c7fcd7
20 changed files with 569 additions and 251 deletions

View file

@ -454,4 +454,11 @@ impl Composition {
*entry = Box::new(new_val); *entry = Box::new(new_val);
ret ret
} }
pub fn context(&self) -> CompositionContext {
CompositionContext {
stack: vec![],
composition: Some(self),
}
}
} }

View file

@ -60,7 +60,7 @@ pub fn init_io_handle(tokio_runtime: &tokio::runtime::Runtime, args: &Args) -> R
Rc::new(TvixStoreIO::new( Rc::new(TvixStoreIO::new(
blob_service.clone(), blob_service.clone(),
directory_service.clone(), directory_service.clone(),
path_info_service.into(), path_info_service,
nar_calculation_service.into(), nar_calculation_service.into(),
build_service.into(), build_service.into(),
tokio_runtime.handle().clone(), tokio_runtime.handle().clone(),

View file

@ -34,7 +34,7 @@ fn interpret(code: &str) {
let tvix_store_io = Rc::new(TvixStoreIO::new( let tvix_store_io = Rc::new(TvixStoreIO::new(
blob_service, blob_service,
directory_service, directory_service,
path_info_service.into(), path_info_service,
nar_calculation_service.into(), nar_calculation_service.into(),
Arc::<DummyBuildService>::default(), Arc::<DummyBuildService>::default(),
TOKIO_RUNTIME.handle().clone(), TOKIO_RUNTIME.handle().clone(),

View file

@ -80,7 +80,7 @@ mod tests {
let io = Rc::new(TvixStoreIO::new( let io = Rc::new(TvixStoreIO::new(
blob_service, blob_service,
directory_service, directory_service,
path_info_service.into(), path_info_service,
nar_calculation_service.into(), nar_calculation_service.into(),
Arc::<DummyBuildService>::default(), Arc::<DummyBuildService>::default(),
runtime.handle().clone(), runtime.handle().clone(),

View file

@ -41,7 +41,7 @@ fn eval_test(code_path: PathBuf, expect_success: bool) {
let tvix_store_io = Rc::new(TvixStoreIO::new( let tvix_store_io = Rc::new(TvixStoreIO::new(
blob_service, blob_service,
directory_service, directory_service,
path_info_service.into(), path_info_service,
nar_calculation_service.into(), nar_calculation_service.into(),
Arc::new(DummyBuildService::default()), Arc::new(DummyBuildService::default()),
tokio_runtime.handle().clone(), tokio_runtime.handle().clone(),

View file

@ -648,7 +648,7 @@ mod tests {
let io = Rc::new(TvixStoreIO::new( let io = Rc::new(TvixStoreIO::new(
blob_service, blob_service,
directory_service, directory_service,
path_info_service.into(), path_info_service,
nar_calculation_service.into(), nar_calculation_service.into(),
Arc::<DummyBuildService>::default(), Arc::<DummyBuildService>::default(),
tokio_runtime.handle().clone(), tokio_runtime.handle().clone(),

View file

@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
) )
.await?; .await?;
let state = AppState::new(blob_service, directory_service, path_info_service.into()); let state = AppState::new(blob_service, directory_service, path_info_service);
let app = nar_bridge::gen_router(cli.priority).with_state(state); let app = nar_bridge::gen_router(cli.priority).with_state(state);

View file

@ -16,7 +16,6 @@ use tracing::{info, info_span, instrument, Level, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt as _; use tracing_indicatif::span_ext::IndicatifSpanExt as _;
use tvix_castore::import::fs::ingest_path; use tvix_castore::import::fs::ingest_path;
use tvix_store::nar::NarCalculationService; use tvix_store::nar::NarCalculationService;
use tvix_store::pathinfoservice::CachePathInfoService;
use tvix_store::proto::NarInfo; use tvix_store::proto::NarInfo;
use tvix_store::proto::PathInfo; use tvix_store::proto::PathInfo;
@ -210,31 +209,38 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
remote_path_info_service_addr, remote_path_info_service_addr,
} => { } => {
// initialize stores // initialize stores
let (blob_service, directory_service, path_info_service, nar_calculation_service) = let mut configs = tvix_store::utils::addrs_to_configs(
tvix_store::utils::construct_services(
blob_service_addr, blob_service_addr,
directory_service_addr, directory_service_addr,
path_info_service_addr, path_info_service_addr,
) )?;
.await?;
// if remote_path_info_service_addr has been specified, // if remote_path_info_service_addr has been specified,
// update path_info_service to point to a cache combining the two. // update path_info_service to point to a cache combining the two.
let path_info_service = if let Some(addr) = remote_path_info_service_addr { if let Some(addr) = remote_path_info_service_addr {
let remote_path_info_service = tvix_store::pathinfoservice::from_addr( use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG};
&addr, use tvix_store::pathinfoservice::CachePathInfoServiceConfig;
blob_service.clone(),
directory_service.clone(),
)
.await?;
let path_info_service = let remote_url = url::Url::parse(&addr)?;
CachePathInfoService::new(path_info_service, remote_path_info_service); let remote_config = with_registry(&REG, || remote_url.try_into())?;
Box::new(path_info_service) as Box<dyn PathInfoService> let local = configs.pathinfoservices.insert(
} else { "default".into(),
path_info_service DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig {
}; near: "local".into(),
far: "remote".into(),
})),
);
configs
.pathinfoservices
.insert("local".into(), local.unwrap());
configs
.pathinfoservices
.insert("remote".into(), remote_config);
}
let (blob_service, directory_service, path_info_service, nar_calculation_service) =
tvix_store::utils::construct_services_from_configs(configs).await?;
let mut server = Server::builder().layer( let mut server = Server::builder().layer(
ServiceBuilder::new() ServiceBuilder::new()
@ -257,7 +263,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
GRPCDirectoryServiceWrapper::new(directory_service), GRPCDirectoryServiceWrapper::new(directory_service),
)) ))
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
Arc::from(path_info_service), path_info_service,
nar_calculation_service, nar_calculation_service,
))); )));
@ -302,8 +308,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
) )
.await?; .await?;
// Arc PathInfoService and NarCalculationService, as we clone it . // Arc NarCalculationService, as we clone it .
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let nar_calculation_service: Arc<dyn NarCalculationService> = let nar_calculation_service: Arc<dyn NarCalculationService> =
nar_calculation_service.into(); nar_calculation_service.into();
@ -365,9 +370,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
let reference_graph: ReferenceGraph<'_> = let reference_graph: ReferenceGraph<'_> =
serde_json::from_slice(reference_graph_json.as_slice())?; serde_json::from_slice(reference_graph_json.as_slice())?;
// Arc the PathInfoService, as we clone it .
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let lookups_span = info_span!( let lookups_span = info_span!(
"lookup pathinfos", "lookup pathinfos",
"indicatif.pb_show" = tracing::field::Empty "indicatif.pb_show" = tracing::field::Empty
@ -475,7 +477,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
let fs = make_fs( let fs = make_fs(
blob_service, blob_service,
directory_service, directory_service,
Arc::from(path_info_service), path_info_service,
list_root, list_root,
show_xattr, show_xattr,
); );
@ -523,7 +525,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
let fs = make_fs( let fs = make_fs(
blob_service, blob_service,
directory_service, directory_service,
Arc::from(path_info_service), path_info_service,
list_root, list_root,
show_xattr, show_xattr,
); );

View file

@ -0,0 +1,22 @@
use lazy_static::lazy_static;
pub use tvix_castore::composition::*;
lazy_static! {
/// The provided registry of tvix_store, which has all the builtin
/// tvix_castore (BlobStore/DirectoryStore) and tvix_store
/// (PathInfoService) implementations.
pub static ref REG: Registry = {
let mut reg = Default::default();
add_default_services(&mut reg);
reg
};
}
/// Register the builtin services of tvix_castore and tvix_store with the given
/// registry. This is useful for creating your own registry with the builtin
/// types _and_ extra third party types.
pub fn add_default_services(reg: &mut Registry) {
tvix_castore::composition::add_default_services(reg);
crate::pathinfoservice::register_pathinfo_services(reg);
}

View file

@ -1,3 +1,4 @@
pub mod composition;
pub mod import; pub mod import;
pub mod nar; pub mod nar;
pub mod pathinfoservice; pub mod pathinfoservice;

View file

@ -10,15 +10,17 @@ use nix_compat::nixbase32;
use prost::Message; use prost::Message;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds}; use serde_with::{serde_as, DurationSeconds};
use std::sync::Arc;
use tonic::async_trait; use tonic::async_trait;
use tracing::{instrument, trace}; use tracing::{instrument, trace};
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error; use tvix_castore::Error;
/// There should not be more than 10 MiB in a single cell. /// There should not be more than 10 MiB in a single cell.
/// https://cloud.google.com/bigtable/docs/schema-design#cells /// https://cloud.google.com/bigtable/docs/schema-design#cells
const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
/// Provides a [DirectoryService] implementation using /// Provides a [PathInfoService] implementation using
/// [Bigtable](https://cloud.google.com/bigtable/docs/) /// [Bigtable](https://cloud.google.com/bigtable/docs/)
/// as an underlying K/V store. /// as an underlying K/V store.
/// ///
@ -44,57 +46,6 @@ pub struct BigtablePathInfoService {
emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
} }
/// Represents configuration of [BigtablePathInfoService].
/// This currently conflates both connect parameters and data model/client
/// behaviour parameters.
#[serde_as]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct BigtableParameters {
project_id: String,
instance_name: String,
#[serde(default)]
is_read_only: bool,
#[serde(default = "default_channel_size")]
channel_size: usize,
#[serde_as(as = "Option<DurationSeconds<String>>")]
#[serde(default = "default_timeout")]
timeout: Option<std::time::Duration>,
table_name: String,
family_name: String,
#[serde(default = "default_app_profile_id")]
app_profile_id: String,
}
impl BigtableParameters {
#[cfg(test)]
pub fn default_for_tests() -> Self {
Self {
project_id: "project-1".into(),
instance_name: "instance-1".into(),
is_read_only: false,
channel_size: default_channel_size(),
timeout: default_timeout(),
table_name: "table-1".into(),
family_name: "cf1".into(),
app_profile_id: default_app_profile_id(),
}
}
}
fn default_app_profile_id() -> String {
"default".to_owned()
}
fn default_channel_size() -> usize {
4
}
fn default_timeout() -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(4))
}
impl BigtablePathInfoService { impl BigtablePathInfoService {
#[cfg(not(test))] #[cfg(not(test))]
pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
@ -412,3 +363,88 @@ impl PathInfoService for BigtablePathInfoService {
Box::pin(stream) Box::pin(stream)
} }
} }
/// Represents configuration of [BigtablePathInfoService].
/// This currently conflates both connect parameters and data model/client
/// behaviour parameters.
#[serde_as]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct BigtableParameters {
project_id: String,
instance_name: String,
#[serde(default)]
is_read_only: bool,
#[serde(default = "default_channel_size")]
channel_size: usize,
#[serde_as(as = "Option<DurationSeconds<String>>")]
#[serde(default = "default_timeout")]
timeout: Option<std::time::Duration>,
table_name: String,
family_name: String,
#[serde(default = "default_app_profile_id")]
app_profile_id: String,
}
impl BigtableParameters {
#[cfg(test)]
pub fn default_for_tests() -> Self {
Self {
project_id: "project-1".into(),
instance_name: "instance-1".into(),
is_read_only: false,
channel_size: default_channel_size(),
timeout: default_timeout(),
table_name: "table-1".into(),
family_name: "cf1".into(),
app_profile_id: default_app_profile_id(),
}
}
}
fn default_app_profile_id() -> String {
"default".to_owned()
}
fn default_channel_size() -> usize {
4
}
fn default_timeout() -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(4))
}
#[async_trait]
impl ServiceBuilder for BigtableParameters {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
Ok(Arc::new(
BigtablePathInfoService::connect(self.clone()).await?,
))
}
}
impl TryFrom<url::Url> for BigtableParameters {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
// parse the instance name from the hostname.
let instance_name = url
.host_str()
.ok_or_else(|| Error::StorageError("instance name missing".into()))?
.to_string();
// … but add it to the query string now, so we just need to parse that.
url.query_pairs_mut()
.append_pair("instance_name", &instance_name);
let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
.map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
Ok(params)
}
}

View file

@ -1,8 +1,11 @@
use std::sync::Arc;
use crate::proto::PathInfo; use crate::proto::PathInfo;
use futures::stream::BoxStream; use futures::stream::BoxStream;
use nix_compat::nixbase32; use nix_compat::nixbase32;
use tonic::async_trait; use tonic::async_trait;
use tracing::{debug, instrument}; use tracing::{debug, instrument};
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error; use tvix_castore::Error;
use super::PathInfoService; use super::PathInfoService;
@ -61,6 +64,41 @@ where
} }
} }
#[derive(serde::Deserialize)]
pub struct CacheConfig {
pub near: String,
pub far: String,
}
impl TryFrom<url::Url> for CacheConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
Err(Error::StorageError(
"Instantiating a CombinedPathInfoService from a url is not supported".into(),
)
.into())
}
}
#[async_trait]
impl ServiceBuilder for CacheConfig {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
_instance_name: &str,
context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (near, far) = futures::join!(
context.resolve(self.near.clone()),
context.resolve(self.far.clone())
);
Ok(Arc::new(Cache {
near: near?,
far: far?,
}))
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::num::NonZeroUsize; use std::num::NonZeroUsize;

View file

@ -1,13 +1,10 @@
use crate::proto::path_info_service_client::PathInfoServiceClient; use super::PathInfoService;
use super::{ use crate::composition::{
GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG,
SledPathInfoService,
}; };
use nix_compat::narinfo;
use std::sync::Arc; use std::sync::Arc;
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; use tvix_castore::Error;
use url::Url; use url::Url;
/// Constructs a new instance of a [PathInfoService] from an URI. /// Constructs a new instance of a [PathInfoService] from an URI.
@ -34,113 +31,21 @@ use url::Url;
/// these also need to be passed in. /// these also need to be passed in.
pub async fn from_addr( pub async fn from_addr(
uri: &str, uri: &str,
blob_service: Arc<dyn BlobService>, context: Option<&CompositionContext<'_>>,
directory_service: Arc<dyn DirectoryService>, ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
) -> Result<Box<dyn PathInfoService>, Error> {
#[allow(unused_mut)] #[allow(unused_mut)]
let mut url = let mut url =
Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
let path_info_service: Box<dyn PathInfoService> = match url.scheme() { let path_info_service_config = with_registry(&REG, || {
"memory" => { <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>>::try_from(
// memory doesn't support host or path in the URL. url,
if url.has_host() || !url.path().is_empty() {
return Err(Error::StorageError("invalid url".to_string()));
}
Box::<MemoryPathInfoService>::default()
}
"sled" => {
// sled doesn't support host, and a path can be provided (otherwise
// it'll live in memory only).
if url.has_host() {
return Err(Error::StorageError("no host allowed".to_string()));
}
if url.path() == "/" {
return Err(Error::StorageError(
"cowardly refusing to open / with sled".to_string(),
));
}
// TODO: expose other parameters as URL parameters?
Box::new(if url.path().is_empty() {
SledPathInfoService::new_temporary()
.map_err(|e| Error::StorageError(e.to_string()))?
} else {
SledPathInfoService::new(url.path())
.map_err(|e| Error::StorageError(e.to_string()))?
})
}
"nix+http" | "nix+https" => {
// Stringify the URL and remove the nix+ prefix.
// We can't use `url.set_scheme(rest)`, as it disallows
// setting something http(s) that previously wasn't.
let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap();
let mut nix_http_path_info_service =
NixHTTPPathInfoService::new(new_url, blob_service, directory_service);
let pairs = &url.query_pairs();
for (k, v) in pairs.into_iter() {
if k == "trusted-public-keys" {
let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect();
let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len());
for pubkey_str in pubkey_strs {
pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| {
Error::StorageError(format!("invalid public key: {e}"))
})?);
}
nix_http_path_info_service.set_public_keys(pubkeys);
}
}
Box::new(nix_http_path_info_service)
}
scheme if scheme.starts_with("grpc+") => {
// schemes starting with grpc+ go to the GRPCPathInfoService.
// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
// - In the case of unix sockets, there must be a path, but may not be a host.
// - In the case of non-unix sockets, there must be a host, but no path.
// Constructing the channel is handled by tvix_castore::channel::from_url.
Box::new(GRPCPathInfoService::from_client(
PathInfoServiceClient::with_interceptor(
tvix_castore::tonic::channel_from_url(&url).await?,
tvix_tracing::propagate::tonic::send_trace,
),
))
}
#[cfg(feature = "cloud")]
"bigtable" => {
use super::bigtable::BigtableParameters;
use super::BigtablePathInfoService;
// parse the instance name from the hostname.
let instance_name = url
.host_str()
.ok_or_else(|| Error::StorageError("instance name missing".into()))?
.to_string();
// … but add it to the query string now, so we just need to parse that.
url.query_pairs_mut()
.append_pair("instance_name", &instance_name);
let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
.map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
Box::new(
BigtablePathInfoService::connect(params)
.await
.map_err(|e| Error::StorageError(e.to_string()))?,
) )
} })?
_ => Err(Error::StorageError(format!( .0;
"unknown scheme: {}", let path_info_service = path_info_service_config
url.scheme() .build("anonymous", context.unwrap_or(&CompositionContext::blank()))
)))?, .await?;
};
Ok(path_info_service) Ok(path_info_service)
} }
@ -148,14 +53,12 @@ pub async fn from_addr(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::from_addr; use super::from_addr;
use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use rstest::rstest; use rstest::rstest;
use std::sync::Arc;
use tempfile::TempDir; use tempfile::TempDir;
use tvix_castore::{ use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig};
blobservice::{BlobService, MemoryBlobService}, use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig};
directoryservice::{DirectoryService, MemoryDirectoryService},
};
lazy_static! { lazy_static! {
static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
@ -224,11 +127,19 @@ mod tests {
)] )]
#[tokio::test] #[tokio::test]
async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default()); let mut comp = Composition::default();
let directory_service: Arc<dyn DirectoryService> = comp.extend(vec![(
Arc::from(MemoryDirectoryService::default()); "default".into(),
DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {})
as Box<dyn ServiceBuilder<Output = dyn BlobService>>),
)]);
comp.extend(vec![(
"default".into(),
DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {})
as Box<dyn ServiceBuilder<Output = dyn DirectoryService>>),
)]);
let resp = from_addr(uri_str, blob_service, directory_service).await; let resp = from_addr(uri_str, Some(&comp.context())).await;
if exp_succeed { if exp_succeed {
resp.expect("should succeed"); resp.expect("should succeed");

View file

@ -6,9 +6,11 @@ use crate::{
use async_stream::try_stream; use async_stream::try_stream;
use futures::stream::BoxStream; use futures::stream::BoxStream;
use nix_compat::nixbase32; use nix_compat::nixbase32;
use std::sync::Arc;
use tonic::{async_trait, Code}; use tonic::{async_trait, Code};
use tracing::{instrument, Span}; use tracing::{instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt; use tracing_indicatif::span_ext::IndicatifSpanExt;
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::{proto as castorepb, Error}; use tvix_castore::{proto as castorepb, Error};
/// Connects to a (remote) tvix-store PathInfoService over gRPC. /// Connects to a (remote) tvix-store PathInfoService over gRPC.
@ -149,6 +151,40 @@ where
} }
} }
#[derive(serde::Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct GRPCPathInfoServiceConfig {
url: String,
}
impl TryFrom<url::Url> for GRPCPathInfoServiceConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(url: url::Url) -> Result<Self, Self::Error> {
// normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
// - In the case of unix sockets, there must be a path, but may not be a host.
// - In the case of non-unix sockets, there must be a host, but no path.
// Constructing the channel is handled by tvix_castore::channel::from_url.
Ok(GRPCPathInfoServiceConfig {
url: url.to_string(),
})
}
}
#[async_trait]
impl ServiceBuilder for GRPCPathInfoServiceConfig {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let client = proto::path_info_service_client::PathInfoServiceClient::new(
tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?,
);
Ok(Arc::new(GRPCPathInfoService::from_client(client)))
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::pathinfoservice::tests::make_grpc_path_info_service_client; use crate::pathinfoservice::tests::make_grpc_path_info_service_client;

View file

@ -9,6 +9,7 @@ use tonic::async_trait;
use tracing::instrument; use tracing::instrument;
use crate::proto::PathInfo; use crate::proto::PathInfo;
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error; use tvix_castore::Error;
use super::PathInfoService; use super::PathInfoService;
@ -60,6 +61,34 @@ impl PathInfoService for LruPathInfoService {
} }
} }
#[derive(serde::Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct LruPathInfoServiceConfig {
capacity: NonZeroUsize,
}
impl TryFrom<url::Url> for LruPathInfoServiceConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
Err(Error::StorageError(
"Instantiating a LruPathInfoService from a url is not supported".into(),
)
.into())
}
}
#[async_trait]
impl ServiceBuilder for LruPathInfoServiceConfig {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity)))
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::num::NonZeroUsize; use std::num::NonZeroUsize;

View file

@ -7,6 +7,7 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tonic::async_trait; use tonic::async_trait;
use tracing::instrument; use tracing::instrument;
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error; use tvix_castore::Error;
#[derive(Default)] #[derive(Default)]
@ -59,3 +60,30 @@ impl PathInfoService for MemoryPathInfoService {
}) })
} }
} }
#[derive(serde::Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct MemoryPathInfoServiceConfig {}
impl TryFrom<url::Url> for MemoryPathInfoServiceConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(url: url::Url) -> Result<Self, Self::Error> {
// memory doesn't support host or path in the URL.
if url.has_host() || !url.path().is_empty() {
return Err(Error::StorageError("invalid url".to_string()).into());
}
Ok(MemoryPathInfoServiceConfig {})
}
}
#[async_trait]
impl ServiceBuilder for MemoryPathInfoServiceConfig {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
Ok(Arc::new(MemoryPathInfoService::default()))
}
}

View file

@ -14,22 +14,26 @@ mod tests;
use futures::stream::BoxStream; use futures::stream::BoxStream;
use tonic::async_trait; use tonic::async_trait;
use tvix_castore::composition::{Registry, ServiceBuilder};
use tvix_castore::Error; use tvix_castore::Error;
use crate::nar::NarCalculationService;
use crate::proto::PathInfo; use crate::proto::PathInfo;
pub use self::combinators::Cache as CachePathInfoService; pub use self::combinators::{
Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig,
};
pub use self::from_addr::from_addr; pub use self::from_addr::from_addr;
pub use self::grpc::GRPCPathInfoService; pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig};
pub use self::lru::LruPathInfoService; pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig};
pub use self::memory::MemoryPathInfoService; pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig};
pub use self::nix_http::NixHTTPPathInfoService; pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig};
pub use self::sled::SledPathInfoService; pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig};
#[cfg(feature = "cloud")] #[cfg(feature = "cloud")]
mod bigtable; mod bigtable;
#[cfg(feature = "cloud")] #[cfg(feature = "cloud")]
pub use self::bigtable::BigtablePathInfoService; pub use self::bigtable::{BigtableParameters, BigtablePathInfoService};
#[cfg(any(feature = "fuse", feature = "virtiofs"))] #[cfg(any(feature = "fuse", feature = "virtiofs"))]
pub use self::fs::make_fs; pub use self::fs::make_fs;
@ -52,12 +56,16 @@ pub trait PathInfoService: Send + Sync {
/// Rust doesn't support this as a generic in traits yet. This is the same thing that /// Rust doesn't support this as a generic in traits yet. This is the same thing that
/// [async_trait] generates, but for streams instead of futures. /// [async_trait] generates, but for streams instead of futures.
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>; fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>;
fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> {
None
}
} }
#[async_trait] #[async_trait]
impl<A> PathInfoService for A impl<A> PathInfoService for A
where where
A: AsRef<dyn PathInfoService> + Send + Sync, A: AsRef<dyn PathInfoService> + Send + Sync + 'static,
{ {
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
self.as_ref().get(digest).await self.as_ref().get(digest).await
@ -71,3 +79,19 @@ where
self.as_ref().list() self.as_ref().list()
} }
} }
/// Registers the builtin PathInfoService implementations with the registry
pub(crate) fn register_pathinfo_services(reg: &mut Registry) {
reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, CachePathInfoServiceConfig>("cache");
reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, GRPCPathInfoServiceConfig>("grpc");
reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, LruPathInfoServiceConfig>("lru");
reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, MemoryPathInfoServiceConfig>("memory");
reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, NixHTTPPathInfoServiceConfig>("nix");
reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, SledPathInfoServiceConfig>("sled");
#[cfg(feature = "cloud")]
{
reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, BigtableParameters>(
"bigtable",
);
}
}

View file

@ -7,12 +7,15 @@ use nix_compat::{
nixhash::NixHash, nixhash::NixHash,
}; };
use reqwest::StatusCode; use reqwest::StatusCode;
use std::sync::Arc;
use tokio::io::{self, AsyncRead}; use tokio::io::{self, AsyncRead};
use tonic::async_trait; use tonic::async_trait;
use tracing::{debug, instrument, warn}; use tracing::{debug, instrument, warn};
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::{ use tvix_castore::{
blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
}; };
use url::Url;
/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
@ -249,3 +252,71 @@ where
})) }))
} }
} }
#[derive(serde::Deserialize)]
pub struct NixHTTPPathInfoServiceConfig {
base_url: String,
blob_service: String,
directory_service: String,
#[serde(default)]
/// An optional list of [narinfo::PubKey].
/// If set, the .narinfo files received need to have correct signature by at least one of these.
public_keys: Option<Vec<String>>,
}
impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(url: Url) -> Result<Self, Self::Error> {
let mut public_keys: Option<Vec<String>> = None;
for (_, v) in url
.query_pairs()
.into_iter()
.filter(|(k, _)| k == "trusted-public-keys")
{
public_keys
.get_or_insert(Default::default())
.extend(v.split_ascii_whitespace().map(ToString::to_string));
}
Ok(NixHTTPPathInfoServiceConfig {
// Stringify the URL and remove the nix+ prefix.
// We can't use `url.set_scheme(rest)`, as it disallows
// setting something http(s) that previously wasn't.
base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(),
blob_service: "default".to_string(),
directory_service: "default".to_string(),
public_keys,
})
}
}
#[async_trait]
impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
_instance_name: &str,
context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (blob_service, directory_service) = futures::join!(
context.resolve(self.blob_service.clone()),
context.resolve(self.directory_service.clone())
);
let mut svc = NixHTTPPathInfoService::new(
Url::parse(&self.base_url)?,
blob_service?,
directory_service?,
);
if let Some(public_keys) = &self.public_keys {
svc.set_public_keys(
public_keys
.iter()
.map(|pubkey_str| {
narinfo::PubKey::parse(pubkey_str)
.map_err(|e| Error::StorageError(format!("invalid public key: {e}")))
})
.collect::<Result<Vec<_>, Error>>()?,
);
}
Ok(Arc::new(svc))
}
}

View file

@ -5,8 +5,10 @@ use futures::stream::BoxStream;
use nix_compat::nixbase32; use nix_compat::nixbase32;
use prost::Message; use prost::Message;
use std::path::Path; use std::path::Path;
use std::sync::Arc;
use tonic::async_trait; use tonic::async_trait;
use tracing::{instrument, warn}; use tracing::{instrument, warn};
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error; use tvix_castore::Error;
/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
@ -114,3 +116,69 @@ impl PathInfoService for SledPathInfoService {
}) })
} }
} }
#[derive(serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SledPathInfoServiceConfig {
is_temporary: bool,
#[serde(default)]
/// required when is_temporary = false
path: Option<String>,
}
impl TryFrom<url::Url> for SledPathInfoServiceConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(url: url::Url) -> Result<Self, Self::Error> {
// sled doesn't support host, and a path can be provided (otherwise
// it'll live in memory only).
if url.has_host() {
return Err(Error::StorageError("no host allowed".to_string()).into());
}
// TODO: expose compression and other parameters as URL parameters?
Ok(if url.path().is_empty() {
SledPathInfoServiceConfig {
is_temporary: true,
path: None,
}
} else {
SledPathInfoServiceConfig {
is_temporary: false,
path: Some(url.path().to_string()),
}
})
}
}
#[async_trait]
impl ServiceBuilder for SledPathInfoServiceConfig {
type Output = dyn PathInfoService;
async fn build<'a>(
&'a self,
_instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
match self {
SledPathInfoServiceConfig {
is_temporary: true,
path: None,
} => Ok(Arc::new(SledPathInfoService::new_temporary()?)),
SledPathInfoServiceConfig {
is_temporary: true,
path: Some(_),
} => Err(
Error::StorageError("Temporary SledPathInfoService can not have path".into())
.into(),
),
SledPathInfoServiceConfig {
is_temporary: false,
path: None,
} => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()),
SledPathInfoServiceConfig {
is_temporary: false,
path: Some(path),
} => Ok(Arc::new(SledPathInfoService::new(path)?)),
}
}
}

View file

@ -1,18 +1,60 @@
use std::sync::Arc;
use std::{ use std::{
collections::HashMap,
pin::Pin, pin::Pin,
sync::Arc,
task::{self, Poll}, task::{self, Poll},
}; };
use tokio::io::{self, AsyncWrite}; use tokio::io::{self, AsyncWrite};
use tvix_castore::{ use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
blobservice::{self, BlobService},
directoryservice::{self, DirectoryService},
};
use url::Url; use url::Url;
use crate::composition::{
with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG,
};
use crate::nar::{NarCalculationService, SimpleRenderer}; use crate::nar::{NarCalculationService, SimpleRenderer};
use crate::pathinfoservice::{self, PathInfoService}; use crate::pathinfoservice::PathInfoService;
#[derive(serde::Deserialize, Default)]
pub struct CompositionConfigs {
pub blobservices:
HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>,
pub directoryservices: HashMap<
String,
DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>,
>,
pub pathinfoservices: HashMap<
String,
DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>,
>,
}
pub fn addrs_to_configs(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,
path_info_service_addr: impl AsRef<str>,
) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> {
let mut configs: CompositionConfigs = Default::default();
let blob_service_url = Url::parse(blob_service_addr.as_ref())?;
let directory_service_url = Url::parse(directory_service_addr.as_ref())?;
let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?;
configs.blobservices.insert(
"default".into(),
with_registry(&REG, || blob_service_url.try_into())?,
);
configs.directoryservices.insert(
"default".into(),
with_registry(&REG, || directory_service_url.try_into())?,
);
configs.pathinfoservices.insert(
"default".into(),
with_registry(&REG, || path_info_service_url.try_into())?,
);
Ok(configs)
}
/// Construct the store handles from their addrs. /// Construct the store handles from their addrs.
pub async fn construct_services( pub async fn construct_services(
@ -23,49 +65,52 @@ pub async fn construct_services(
( (
Arc<dyn BlobService>, Arc<dyn BlobService>,
Arc<dyn DirectoryService>, Arc<dyn DirectoryService>,
Box<dyn PathInfoService>, Arc<dyn PathInfoService>,
Box<dyn NarCalculationService>, Box<dyn NarCalculationService>,
), ),
Box<dyn std::error::Error + Send + Sync>, Box<dyn std::error::Error + Send + Sync>,
> { > {
let blob_service: Arc<dyn BlobService> = let configs = addrs_to_configs(
blobservice::from_addr(blob_service_addr.as_ref()).await?; blob_service_addr,
let directory_service: Arc<dyn DirectoryService> = directory_service_addr,
directoryservice::from_addr(directory_service_addr.as_ref()).await?; path_info_service_addr,
)?;
construct_services_from_configs(configs).await
}
let path_info_service = pathinfoservice::from_addr( /// Construct the store handles from their addrs.
path_info_service_addr.as_ref(), pub async fn construct_services_from_configs(
blob_service.clone(), configs: CompositionConfigs,
directory_service.clone(), ) -> Result<
) (
.await?; Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
Arc<dyn PathInfoService>,
Box<dyn NarCalculationService>,
),
Box<dyn std::error::Error + Send + Sync>,
> {
let mut comp = Composition::default();
comp.extend(configs.blobservices);
comp.extend(configs.directoryservices);
comp.extend(configs.pathinfoservices);
let blob_service: Arc<dyn BlobService> = comp.build("default").await?;
let directory_service: Arc<dyn DirectoryService> = comp.build("default").await?;
let path_info_service: Arc<dyn PathInfoService> = comp.build("default").await?;
// HACK: The grpc client also implements NarCalculationService, and we // HACK: The grpc client also implements NarCalculationService, and we
// really want to use it (otherwise we'd need to fetch everything again for hashing). // really want to use it (otherwise we'd need to fetch everything again for hashing).
// Until we revamped store composition and config, detect this special case here. // Until we revamped store composition and config, detect this special case here.
let nar_calculation_service: Box<dyn NarCalculationService> = { let nar_calculation_service: Box<dyn NarCalculationService> = path_info_service
use crate::pathinfoservice::GRPCPathInfoService; .nar_calculation_service()
use crate::proto::path_info_service_client::PathInfoServiceClient; .unwrap_or_else(|| {
let url = Url::parse(path_info_service_addr.as_ref())
.map_err(|e| io::Error::other(e.to_string()))?;
if url.scheme().starts_with("grpc+") {
Box::new(GRPCPathInfoService::from_client(
PathInfoServiceClient::with_interceptor(
tvix_castore::tonic::channel_from_url(&url)
.await
.map_err(|e| io::Error::other(e.to_string()))?,
tvix_tracing::propagate::tonic::send_trace,
),
))
} else {
Box::new(SimpleRenderer::new( Box::new(SimpleRenderer::new(
blob_service.clone(), blob_service.clone(),
directory_service.clone(), directory_service.clone(),
)) as Box<dyn NarCalculationService> ))
} });
};
Ok(( Ok((
blob_service, blob_service,