feat(tvix): different service types in one composition
This will be necessary for the PathInfoService composition, as some PathInfoService implementations require a BlobService & DirectoryService to ingest into. Using the Extend trait for creating compositions allows extending the same composition with configs of various types e.g. BlobStore, DirectoryStore Generics are moved from the Composition struct to the functions.The storage of the InstantiatonStates uses the TypeId in the key and a Box<dyn Any> in the value, which is downcasted to InstantiatonState<T>. Change-Id: I2af11f26c535029adfb1c62905e0e7c4aaed7b51 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11980 Reviewed-by: flokli <flokli@flokli.de> Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI Autosubmit: yuka <yuka@yuka.dev>
This commit is contained in:
parent
7ccdf6dad5
commit
4801d6bf85
11 changed files with 64 additions and 46 deletions
|
@ -119,7 +119,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
context: &CompositionContext<dyn BlobService>,
|
context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
let (local, remote) = futures::join!(
|
let (local, remote) = futures::join!(
|
||||||
context.resolve(self.local.clone()),
|
context.resolve(self.local.clone()),
|
||||||
|
|
|
@ -206,7 +206,7 @@ impl ServiceBuilder for GRPCBlobServiceConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
_context: &CompositionContext<dyn BlobService>,
|
_context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
let client = proto::blob_service_client::BlobServiceClient::new(
|
let client = proto::blob_service_client::BlobServiceClient::new(
|
||||||
crate::tonic::channel_from_url(&self.url.parse()?).await?,
|
crate::tonic::channel_from_url(&self.url.parse()?).await?,
|
||||||
|
|
|
@ -59,7 +59,7 @@ impl ServiceBuilder for MemoryBlobServiceConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
_context: &CompositionContext<dyn BlobService>,
|
_context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
Ok(Arc::new(MemoryBlobService::default()))
|
Ok(Arc::new(MemoryBlobService::default()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,7 +295,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
_context: &CompositionContext<dyn BlobService>,
|
_context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
let (object_store, path) = object_store::parse_url_opts(
|
let (object_store, path) = object_store::parse_url_opts(
|
||||||
&self.object_store_url.parse()?,
|
&self.object_store_url.parse()?,
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
//! #[tonic::async_trait]
|
//! #[tonic::async_trait]
|
||||||
//! impl ServiceBuilder for MyBlobServiceConfig {
|
//! impl ServiceBuilder for MyBlobServiceConfig {
|
||||||
//! type Output = dyn BlobService;
|
//! type Output = dyn BlobService;
|
||||||
//! async fn build(&self, _: &str, _: &CompositionContext<Self::Output>) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
//! async fn build(&self, _: &str, _: &CompositionContext) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
//! todo!()
|
//! todo!()
|
||||||
//! }
|
//! }
|
||||||
//! }
|
//! }
|
||||||
|
@ -49,6 +49,7 @@
|
||||||
//! ### Example 2.: Composing stores to get one store
|
//! ### Example 2.: Composing stores to get one store
|
||||||
//!
|
//!
|
||||||
//! ```
|
//! ```
|
||||||
|
//! use std::sync::Arc;
|
||||||
//! use tvix_castore::composition::*;
|
//! use tvix_castore::composition::*;
|
||||||
//! use tvix_castore::blobservice::BlobService;
|
//! use tvix_castore::blobservice::BlobService;
|
||||||
//!
|
//!
|
||||||
|
@ -69,8 +70,9 @@
|
||||||
//! });
|
//! });
|
||||||
//!
|
//!
|
||||||
//! let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json))?;
|
//! let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json))?;
|
||||||
//! let blob_service_composition = Composition::<dyn BlobService>::from_configs(blob_services_configs);
|
//! let mut blob_service_composition = Composition::default();
|
||||||
//! let blob_service = blob_service_composition.build("default").await?;
|
//! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
|
||||||
|
//! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("default").await?;
|
||||||
//! # Ok(())
|
//! # Ok(())
|
||||||
//! # })
|
//! # })
|
||||||
//! # }
|
//! # }
|
||||||
|
@ -271,12 +273,12 @@ pub fn add_default_services(reg: &mut Registry) {
|
||||||
crate::directoryservice::register_directory_services(reg);
|
crate::directoryservice::register_directory_services(reg);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CompositionContext<'a, T: ?Sized> {
|
pub struct CompositionContext<'a> {
|
||||||
stack: Vec<String>,
|
stack: Vec<String>,
|
||||||
composition: Option<&'a Composition<T>>,
|
composition: Option<&'a Composition>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T: ?Sized + Send + Sync + 'static> CompositionContext<'a, T> {
|
impl<'a> CompositionContext<'a> {
|
||||||
pub fn blank() -> Self {
|
pub fn blank() -> Self {
|
||||||
Self {
|
Self {
|
||||||
stack: Default::default(),
|
stack: Default::default(),
|
||||||
|
@ -284,7 +286,7 @@ impl<'a, T: ?Sized + Send + Sync + 'static> CompositionContext<'a, T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn resolve(
|
pub async fn resolve<T: ?Sized + Send + Sync + 'static>(
|
||||||
&self,
|
&self,
|
||||||
entrypoint: String,
|
entrypoint: String,
|
||||||
) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
|
@ -307,7 +309,7 @@ pub trait ServiceBuilder: Send + Sync {
|
||||||
async fn build(
|
async fn build(
|
||||||
&self,
|
&self,
|
||||||
instance_name: &str,
|
instance_name: &str,
|
||||||
context: &CompositionContext<Self::Output>,
|
context: &CompositionContext,
|
||||||
) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>>;
|
) -> Result<Arc<Self::Output>, Box<dyn std::error::Error + Send + Sync + 'static>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,8 +327,9 @@ enum InstantiationState<T: ?Sized> {
|
||||||
Done(Result<Arc<T>, CompositionError>),
|
Done(Result<Arc<T>, CompositionError>),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Composition<T: ?Sized> {
|
#[derive(Default)]
|
||||||
stores: std::sync::Mutex<HashMap<String, InstantiationState<T>>>,
|
pub struct Composition {
|
||||||
|
stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(thiserror::Error, Clone, Debug)]
|
#[derive(thiserror::Error, Clone, Debug)]
|
||||||
|
@ -341,44 +344,57 @@ pub enum CompositionError {
|
||||||
Failed(String, Arc<dyn std::error::Error + Send + Sync>),
|
Failed(String, Arc<dyn std::error::Error + Send + Sync>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: ?Sized + Send + Sync + 'static> Composition<T> {
|
impl<T: ?Sized + Send + Sync + 'static>
|
||||||
pub fn from_configs(
|
Extend<(
|
||||||
// Keep the concrete `HashMap` type here since it allows for type
|
String,
|
||||||
// inference of what type is previously being deserialized.
|
DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>,
|
||||||
configs: HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>>,
|
)> for Composition
|
||||||
) -> Self {
|
{
|
||||||
Self::from_configs_iter(configs)
|
fn extend<I>(&mut self, configs: I)
|
||||||
}
|
where
|
||||||
|
I: IntoIterator<
|
||||||
pub fn from_configs_iter(
|
|
||||||
configs: impl IntoIterator<
|
|
||||||
Item = (
|
Item = (
|
||||||
String,
|
String,
|
||||||
DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>,
|
DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>,
|
||||||
),
|
),
|
||||||
>,
|
>,
|
||||||
) -> Self {
|
{
|
||||||
Composition {
|
self.stores
|
||||||
stores: std::sync::Mutex::new(
|
.lock()
|
||||||
configs
|
.unwrap()
|
||||||
.into_iter()
|
.extend(configs.into_iter().map(|(k, v)| {
|
||||||
.map(|(k, v)| (k, InstantiationState::Config(v.0)))
|
(
|
||||||
.collect(),
|
(TypeId::of::<T>(), k),
|
||||||
),
|
Box::new(InstantiationState::Config(v.0)) as Box<dyn Any + Send + Sync>,
|
||||||
|
)
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Composition {
|
||||||
|
pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>(
|
||||||
|
&mut self,
|
||||||
|
// Keep the concrete `HashMap` type here since it allows for type
|
||||||
|
// inference of what type is previously being deserialized.
|
||||||
|
configs: HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>>>,
|
||||||
|
) {
|
||||||
|
self.extend(configs);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn build(&self, entrypoint: &str) -> Result<Arc<T>, CompositionError> {
|
pub async fn build<T: ?Sized + Send + Sync + 'static>(
|
||||||
|
&self,
|
||||||
|
entrypoint: &str,
|
||||||
|
) -> Result<Arc<T>, CompositionError> {
|
||||||
self.build_internal(vec![], entrypoint.to_string()).await
|
self.build_internal(vec![], entrypoint.to_string()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_internal(
|
fn build_internal<T: ?Sized + Send + Sync + 'static>(
|
||||||
&self,
|
&self,
|
||||||
stack: Vec<String>,
|
stack: Vec<String>,
|
||||||
entrypoint: String,
|
entrypoint: String,
|
||||||
) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
|
) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
|
||||||
let mut stores = self.stores.lock().unwrap();
|
let mut stores = self.stores.lock().unwrap();
|
||||||
let entry = match stores.get_mut(&entrypoint) {
|
let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) {
|
||||||
Some(v) => v,
|
Some(v) => v,
|
||||||
None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
|
None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
|
||||||
};
|
};
|
||||||
|
@ -387,9 +403,11 @@ impl<T: ?Sized + Send + Sync + 'static> Composition<T> {
|
||||||
// this temporary value.
|
// this temporary value.
|
||||||
let prev_val = std::mem::replace(
|
let prev_val = std::mem::replace(
|
||||||
entry,
|
entry,
|
||||||
InstantiationState::Done(Err(CompositionError::Poisoned(entrypoint.clone()))),
|
Box::new(InstantiationState::<T>::Done(Err(
|
||||||
|
CompositionError::Poisoned(entrypoint.clone()),
|
||||||
|
))),
|
||||||
);
|
);
|
||||||
let (new_val, ret) = match prev_val {
|
let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() {
|
||||||
InstantiationState::Done(service) => (
|
InstantiationState::Done(service) => (
|
||||||
InstantiationState::Done(service.clone()),
|
InstantiationState::Done(service.clone()),
|
||||||
futures::future::ready(service).boxed(),
|
futures::future::ready(service).boxed(),
|
||||||
|
@ -433,7 +451,7 @@ impl<T: ?Sized + Send + Sync + 'static> Composition<T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
*entry = new_val;
|
*entry = Box::new(new_val);
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -353,7 +353,7 @@ impl ServiceBuilder for BigtableParameters {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
_context: &CompositionContext<dyn DirectoryService>,
|
_context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
Ok(Arc::new(
|
Ok(Arc::new(
|
||||||
BigtableDirectoryService::connect(self.clone()).await?,
|
BigtableDirectoryService::connect(self.clone()).await?,
|
||||||
|
|
|
@ -167,7 +167,7 @@ impl ServiceBuilder for CacheConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
context: &CompositionContext<dyn DirectoryService>,
|
context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
let (near, far) = futures::join!(
|
let (near, far) = futures::join!(
|
||||||
context.resolve(self.near.clone()),
|
context.resolve(self.near.clone()),
|
||||||
|
|
|
@ -243,7 +243,7 @@ impl ServiceBuilder for GRPCDirectoryServiceConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
_context: &CompositionContext<dyn DirectoryService>,
|
_context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
let client = proto::directory_service_client::DirectoryServiceClient::new(
|
let client = proto::directory_service_client::DirectoryServiceClient::new(
|
||||||
crate::tonic::channel_from_url(&self.url.parse()?).await?,
|
crate::tonic::channel_from_url(&self.url.parse()?).await?,
|
||||||
|
|
|
@ -108,7 +108,7 @@ impl ServiceBuilder for MemoryDirectoryServiceConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
_context: &CompositionContext<dyn DirectoryService>,
|
_context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
Ok(Arc::new(MemoryDirectoryService::default()))
|
Ok(Arc::new(MemoryDirectoryService::default()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,7 +211,7 @@ impl ServiceBuilder for ObjectStoreDirectoryServiceConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
_context: &CompositionContext<dyn DirectoryService>,
|
_context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
let (object_store, path) = object_store::parse_url_opts(
|
let (object_store, path) = object_store::parse_url_opts(
|
||||||
&self.object_store_url.parse()?,
|
&self.object_store_url.parse()?,
|
||||||
|
|
|
@ -176,7 +176,7 @@ impl ServiceBuilder for SledDirectoryServiceConfig {
|
||||||
async fn build<'a>(
|
async fn build<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
_instance_name: &str,
|
_instance_name: &str,
|
||||||
_context: &CompositionContext<dyn DirectoryService>,
|
_context: &CompositionContext,
|
||||||
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||||
match self {
|
match self {
|
||||||
SledDirectoryServiceConfig {
|
SledDirectoryServiceConfig {
|
||||||
|
|
Loading…
Reference in a new issue