feat(tvix/castore/directoryservice): add Router combinator

Change-Id: Iea945a119bfc70d5f089506fcde7d7a466496808
This commit is contained in:
Yureka 2024-07-22 00:42:31 +02:00 committed by sinavir
parent 3b16bacb8c
commit 85fd360bfa
2 changed files with 74 additions and 1 deletions

View file

@ -178,3 +178,75 @@ impl ServiceBuilder for CacheConfig {
}))
}
}
#[derive(Clone)]
pub struct Router<DS1, DS2> {
writes: DS1,
reads: DS2,
}
#[async_trait]
impl<DS1, DS2> DirectoryService for Router<DS1, DS2>
where
DS1: DirectoryService + Clone + 'static,
DS2: DirectoryService + Clone + 'static,
{
#[instrument(skip(self, digest), fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
self.reads.get(digest).await
}
#[instrument(skip_all)]
async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
self.writes.put(directory).await
}
#[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<'static, Result<Directory, Error>> {
self.reads.get_recursive(root_directory_digest)
}
#[instrument(skip_all)]
fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> {
self.writes.put_multiple_start()
}
}
#[derive(serde::Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct RouterConfig {
writes: String,
reads: String,
}
impl TryFrom<url::Url> for RouterConfig {
type Error = Box<dyn std::error::Error + Send + Sync>;
fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
Err(Error::StorageError(
"Instantiating a CombinedDirectoryService from a url is not supported".into(),
)
.into())
}
}
#[async_trait]
impl ServiceBuilder for RouterConfig {
type Output = dyn DirectoryService;
async fn build<'a>(
&'a self,
_instance_name: &str,
context: &CompositionContext,
) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (writes, reads) = futures::join!(
context.resolve(self.writes.clone()),
context.resolve(self.reads.clone())
);
Ok(Arc::new(Router {
writes: writes?,
reads: reads?,
}))
}
}

View file

@ -18,7 +18,7 @@ pub mod tests;
mod traverse;
mod utils;
pub use self::combinators::{Cache, CacheConfig};
pub use self::combinators::{Cache, CacheConfig, Router, RouterConfig};
pub use self::directory_graph::{DirectoryGraph, ValidatedDirectoryGraph};
pub use self::from_addr::from_addr;
pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig};
@ -137,6 +137,7 @@ pub(crate) fn register_directory_services(reg: &mut Registry) {
reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::ObjectStoreDirectoryServiceConfig>("objectstore");
reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::MemoryDirectoryServiceConfig>("memory");
reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache");
reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::RouterConfig>("router");
reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc");
reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::SledDirectoryServiceConfig>("sled");
reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::RedbDirectoryServiceConfig>("redb");