From 4801d6bf85ebb47910dd4bfe9db11846ff438572 Mon Sep 17 00:00:00 2001 From: Yureka Date: Fri, 19 Jul 2024 10:51:05 +0200 Subject: [PATCH] feat(tvix): different service types in one composition This will be necessary for the PathInfoService composition, as some PathInfoService implementations require a BlobService & DirectoryService to ingest into. Using the Extend trait for creating compositions allows extending the same composition with configs of various types e.g. BlobStore, DirectoryStore Generics are moved from the Composition struct to the functions.The storage of the InstantiatonStates uses the TypeId in the key and a Box in the value, which is downcasted to InstantiatonState. Change-Id: I2af11f26c535029adfb1c62905e0e7c4aaed7b51 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11980 Reviewed-by: flokli Reviewed-by: raitobezarius Tested-by: BuildkiteCI Autosubmit: yuka --- tvix/castore/src/blobservice/combinator.rs | 2 +- tvix/castore/src/blobservice/grpc.rs | 2 +- tvix/castore/src/blobservice/memory.rs | 2 +- tvix/castore/src/blobservice/object_store.rs | 2 +- tvix/castore/src/composition.rs | 90 +++++++++++-------- tvix/castore/src/directoryservice/bigtable.rs | 2 +- .../src/directoryservice/combinators.rs | 2 +- tvix/castore/src/directoryservice/grpc.rs | 2 +- tvix/castore/src/directoryservice/memory.rs | 2 +- .../src/directoryservice/object_store.rs | 2 +- tvix/castore/src/directoryservice/sled.rs | 2 +- 11 files changed, 64 insertions(+), 46 deletions(-) diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 8ec5a859b..fc33d16a3 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -119,7 +119,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig { async fn build<'a>( &'a self, _instance_name: &str, - context: &CompositionContext, + context: &CompositionContext, ) -> Result, Box> { let (local, remote) = futures::join!( context.resolve(self.local.clone()), diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index f5705adbf..0db3dfea4 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -206,7 +206,7 @@ impl ServiceBuilder for GRPCBlobServiceConfig { async fn build<'a>( &'a self, _instance_name: &str, - _context: &CompositionContext, + _context: &CompositionContext, ) -> Result, Box> { let client = proto::blob_service_client::BlobServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 83b37edb1..3d733f950 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -59,7 +59,7 @@ impl ServiceBuilder for MemoryBlobServiceConfig { async fn build<'a>( &'a self, _instance_name: &str, - _context: &CompositionContext, + _context: &CompositionContext, ) -> Result, Box> { Ok(Arc::new(MemoryBlobService::default())) } diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index d898ce19e..5bb05cf26 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -295,7 +295,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig { async fn build<'a>( &'a self, _instance_name: &str, - _context: &CompositionContext, + _context: &CompositionContext, ) -> Result, Box> { let (object_store, path) = object_store::parse_url_opts( &self.object_store_url.parse()?, diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs index 9e7b3712f..18a767284 100644 --- a/tvix/castore/src/composition.rs +++ b/tvix/castore/src/composition.rs @@ -26,7 +26,7 @@ //! #[tonic::async_trait] //! impl ServiceBuilder for MyBlobServiceConfig { //! type Output = dyn BlobService; -//! async fn build(&self, _: &str, _: &CompositionContext) -> Result, Box> { +//! async fn build(&self, _: &str, _: &CompositionContext) -> Result, Box> { //! todo!() //! } //! } @@ -49,6 +49,7 @@ //! ### Example 2.: Composing stores to get one store //! //! ``` +//! use std::sync::Arc; //! use tvix_castore::composition::*; //! use tvix_castore::blobservice::BlobService; //! @@ -69,8 +70,9 @@ //! }); //! //! let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json))?; -//! let blob_service_composition = Composition::::from_configs(blob_services_configs); -//! let blob_service = blob_service_composition.build("default").await?; +//! let mut blob_service_composition = Composition::default(); +//! blob_service_composition.extend_with_configs::(blob_services_configs); +//! let blob_service: Arc = blob_service_composition.build("default").await?; //! # Ok(()) //! # }) //! # } @@ -271,12 +273,12 @@ pub fn add_default_services(reg: &mut Registry) { crate::directoryservice::register_directory_services(reg); } -pub struct CompositionContext<'a, T: ?Sized> { +pub struct CompositionContext<'a> { stack: Vec, - composition: Option<&'a Composition>, + composition: Option<&'a Composition>, } -impl<'a, T: ?Sized + Send + Sync + 'static> CompositionContext<'a, T> { +impl<'a> CompositionContext<'a> { pub fn blank() -> Self { Self { stack: Default::default(), @@ -284,7 +286,7 @@ impl<'a, T: ?Sized + Send + Sync + 'static> CompositionContext<'a, T> { } } - pub async fn resolve( + pub async fn resolve( &self, entrypoint: String, ) -> Result, Box> { @@ -307,7 +309,7 @@ pub trait ServiceBuilder: Send + Sync { async fn build( &self, instance_name: &str, - context: &CompositionContext, + context: &CompositionContext, ) -> Result, Box>; } @@ -325,8 +327,9 @@ enum InstantiationState { Done(Result, CompositionError>), } -pub struct Composition { - stores: std::sync::Mutex>>, +#[derive(Default)] +pub struct Composition { + stores: std::sync::Mutex>>, } #[derive(thiserror::Error, Clone, Debug)] @@ -341,44 +344,57 @@ pub enum CompositionError { Failed(String, Arc), } -impl Composition { - pub fn from_configs( - // Keep the concrete `HashMap` type here since it allows for type - // inference of what type is previously being deserialized. - configs: HashMap>>>, - ) -> Self { - Self::from_configs_iter(configs) - } - - pub fn from_configs_iter( - configs: impl IntoIterator< +impl + Extend<( + String, + DeserializeWithRegistry>>, + )> for Composition +{ + fn extend(&mut self, configs: I) + where + I: IntoIterator< Item = ( String, DeserializeWithRegistry>>, ), >, - ) -> Self { - Composition { - stores: std::sync::Mutex::new( - configs - .into_iter() - .map(|(k, v)| (k, InstantiationState::Config(v.0))) - .collect(), - ), - } + { + self.stores + .lock() + .unwrap() + .extend(configs.into_iter().map(|(k, v)| { + ( + (TypeId::of::(), k), + Box::new(InstantiationState::Config(v.0)) as Box, + ) + })) + } +} + +impl Composition { + pub fn extend_with_configs( + &mut self, + // Keep the concrete `HashMap` type here since it allows for type + // inference of what type is previously being deserialized. + configs: HashMap>>>, + ) { + self.extend(configs); } - pub async fn build(&self, entrypoint: &str) -> Result, CompositionError> { + pub async fn build( + &self, + entrypoint: &str, + ) -> Result, CompositionError> { self.build_internal(vec![], entrypoint.to_string()).await } - fn build_internal( + fn build_internal( &self, stack: Vec, entrypoint: String, ) -> BoxFuture<'_, Result, CompositionError>> { let mut stores = self.stores.lock().unwrap(); - let entry = match stores.get_mut(&entrypoint) { + let entry = match stores.get_mut(&(TypeId::of::(), entrypoint.clone())) { Some(v) => v, None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), }; @@ -387,9 +403,11 @@ impl Composition { // this temporary value. let prev_val = std::mem::replace( entry, - InstantiationState::Done(Err(CompositionError::Poisoned(entrypoint.clone()))), + Box::new(InstantiationState::::Done(Err( + CompositionError::Poisoned(entrypoint.clone()), + ))), ); - let (new_val, ret) = match prev_val { + let (new_val, ret) = match *prev_val.downcast::>().unwrap() { InstantiationState::Done(service) => ( InstantiationState::Done(service.clone()), futures::future::ready(service).boxed(), @@ -433,7 +451,7 @@ impl Composition { }) } }; - *entry = new_val; + *entry = Box::new(new_val); ret } } diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index 596094930..d10dddaf9 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -353,7 +353,7 @@ impl ServiceBuilder for BigtableParameters { async fn build<'a>( &'a self, _instance_name: &str, - _context: &CompositionContext, + _context: &CompositionContext, ) -> Result, Box> { Ok(Arc::new( BigtableDirectoryService::connect(self.clone()).await?, diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index 74d02f1ad..0fdc82c16 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -167,7 +167,7 @@ impl ServiceBuilder for CacheConfig { async fn build<'a>( &'a self, _instance_name: &str, - context: &CompositionContext, + context: &CompositionContext, ) -> Result, Box> { let (near, far) = futures::join!( context.resolve(self.near.clone()), diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 415796fa5..ff08bad4b 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -243,7 +243,7 @@ impl ServiceBuilder for GRPCDirectoryServiceConfig { async fn build<'a>( &'a self, _instance_name: &str, - _context: &CompositionContext, + _context: &CompositionContext, ) -> Result, Box> { let client = proto::directory_service_client::DirectoryServiceClient::new( crate::tonic::channel_from_url(&self.url.parse()?).await?, diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index c1fc361f0..ada4606a5 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -108,7 +108,7 @@ impl ServiceBuilder for MemoryDirectoryServiceConfig { async fn build<'a>( &'a self, _instance_name: &str, - _context: &CompositionContext, + _context: &CompositionContext, ) -> Result, Box> { Ok(Arc::new(MemoryDirectoryService::default())) } diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index 0f0423a49..a9a2cc8ef 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -211,7 +211,7 @@ impl ServiceBuilder for ObjectStoreDirectoryServiceConfig { async fn build<'a>( &'a self, _instance_name: &str, - _context: &CompositionContext, + _context: &CompositionContext, ) -> Result, Box> { let (object_store, path) = object_store::parse_url_opts( &self.object_store_url.parse()?, diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 61058b392..5766dec1a 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -176,7 +176,7 @@ impl ServiceBuilder for SledDirectoryServiceConfig { async fn build<'a>( &'a self, _instance_name: &str, - _context: &CompositionContext, + _context: &CompositionContext, ) -> Result, Box> { match self { SledDirectoryServiceConfig {