feat(tvix/composition): allow urls as anonymous stores

This allows specifying an url in place of a named reference to another
composition entry, if the castore crate has been compiled with the
 xp-store-composition feature.

Example: `--directory-service-addr cache://?near=memory://&far=memory://`

This would be equivalent to the instantiation via toml file:

```toml
[memory1]
type = "memory"

[memory2]
type = "memory"

[default]
type = "cache"
near = "memory1"
far = "memory2"
```

Note that each anonymous url causes a distinct instance to be created.

Change-Id: Iee5a07a94b063b5e767c704d9cad0114fa843164
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12146
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Yureka 2024-08-07 16:50:38 +02:00 committed by yuka
parent ba4e02c3ac
commit 52bb3c6d02
9 changed files with 151 additions and 91 deletions

View file

@ -15401,7 +15401,7 @@ rec {
"tonic-reflection" = [ "dep:tonic-reflection" ]; "tonic-reflection" = [ "dep:tonic-reflection" ];
"virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ]; "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ];
}; };
resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" ]; resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" "xp-store-composition" ];
}; };
"tvix-cli" = rec { "tvix-cli" = rec {
crateName = "tvix-cli"; crateName = "tvix-cli";
@ -16170,7 +16170,7 @@ rec {
"tonic-reflection" = [ "dep:tonic-reflection" "tvix-castore/tonic-reflection" ]; "tonic-reflection" = [ "dep:tonic-reflection" "tvix-castore/tonic-reflection" ];
"tracy" = [ "tvix-tracing/tracy" ]; "tracy" = [ "tvix-tracing/tracy" ];
"virtiofs" = [ "tvix-castore/virtiofs" ]; "virtiofs" = [ "tvix-castore/virtiofs" ];
"xp-store-composition" = [ "toml" ]; "xp-store-composition" = [ "toml" "tvix-castore/xp-store-composition" ];
}; };
resolvedDefaultFeatures = [ "cloud" "default" "fuse" "integration" "otlp" "toml" "tonic-reflection" "tracy" "virtiofs" "xp-store-composition" ]; resolvedDefaultFeatures = [ "cloud" "default" "fuse" "integration" "otlp" "toml" "tonic-reflection" "tracy" "virtiofs" "xp-store-composition" ];
}; };

View file

@ -91,6 +91,11 @@ virtiofs = [
] ]
fuse = ["fs"] fuse = ["fs"]
tonic-reflection = ["dep:tonic-reflection"] tonic-reflection = ["dep:tonic-reflection"]
# It's already possible for other crates to build a
# fully fledged store composition system based on castore composition.
# However, this feature enables anonymous url syntax which might
# inherently expose arbitrary composition possibilities to the user.
xp-store-composition = []
# Whether to run the integration tests. # Whether to run the integration tests.
# Requires the following packages in $PATH: # Requires the following packages in $PATH:
# cbtemulator, google-cloud-bigtable-tool # cbtemulator, google-cloud-bigtable-tool

View file

@ -9,7 +9,7 @@
meta.ci.targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru); meta.ci.targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru);
passthru = (depot.tvix.utils.mkFeaturePowerset { passthru = (depot.tvix.utils.mkFeaturePowerset {
inherit (old) crateName; inherit (old) crateName;
features = ([ "cloud" "fuse" "tonic-reflection" ] features = ([ "cloud" "fuse" "tonic-reflection" "xp-store-composition" ]
# virtiofs feature currently fails to build on Darwin # virtiofs feature currently fails to build on Darwin
++ lib.optional pkgs.stdenv.isLinux "virtiofs"); ++ lib.optional pkgs.stdenv.isLinux "virtiofs");
override.testPreRun = '' override.testPreRun = ''

View file

@ -27,7 +27,7 @@ pub async fn from_addr(
})? })?
.0; .0;
let blob_service = blob_service_config let blob_service = blob_service_config
.build("anonymous", &CompositionContext::blank()) .build("anonymous", &CompositionContext::blank(&REG))
.await?; .await?;
Ok(blob_service) Ok(blob_service)

View file

@ -70,7 +70,7 @@
//! }); //! });
//! //!
//! let blob_services_configs = with_registry(&REG, || serde_json::from_value(blob_services_configs_json))?; //! let blob_services_configs = with_registry(&REG, || serde_json::from_value(blob_services_configs_json))?;
//! let mut blob_service_composition = Composition::default(); //! let mut blob_service_composition = Composition::new(&REG);
//! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); //! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
//! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("default").await?; //! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("default").await?;
//! # Ok(()) //! # Ok(())
@ -88,6 +88,13 @@
//! ``` //! ```
//! //!
//! Continue with Example 2, with my_registry instead of REG //! Continue with Example 2, with my_registry instead of REG
//!
//! EXPERIMENTAL: If the xp-store-composition feature is enabled,
//! entrypoints can also be URL strings, which are created as
//! anonymous stores. Instantiations of the same URL will
//! result in a new, distinct anonymous store each time, so creating
//! two `memory://` stores with this method will not share the same view.
//! This behavior might change in the future.
use erased_serde::deserialize; use erased_serde::deserialize;
use futures::future::BoxFuture; use futures::future::BoxFuture;
@ -278,12 +285,15 @@ pub struct CompositionContext<'a> {
// The TypeId of the trait object is included to distinguish e.g. the // The TypeId of the trait object is included to distinguish e.g. the
// BlobService "default" and the DirectoryService "default". // BlobService "default" and the DirectoryService "default".
stack: Vec<(TypeId, String)>, stack: Vec<(TypeId, String)>,
registry: &'static Registry,
composition: Option<&'a Composition>, composition: Option<&'a Composition>,
} }
impl<'a> CompositionContext<'a> { impl<'a> CompositionContext<'a> {
pub fn blank() -> Self { /// Get a composition context for one-off store creation.
pub fn blank(registry: &'static Registry) -> Self {
Self { Self {
registry,
stack: Default::default(), stack: Default::default(),
composition: None, composition: None,
} }
@ -303,10 +313,104 @@ impl<'a> CompositionContext<'a> {
) )
.into()); .into());
} }
match self.composition {
Some(comp) => Ok(comp.build_internal(self.stack.clone(), entrypoint).await?), Ok(self.build_internal(entrypoint).await?)
None => Err(CompositionError::NotFound(entrypoint).into()),
} }
#[cfg(feature = "xp-store-composition")]
async fn build_anonymous<T: ?Sized + Send + Sync + 'static>(
&self,
entrypoint: String,
) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync>> {
let url = url::Url::parse(&entrypoint)?;
let config: DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>> =
with_registry(self.registry, || url.try_into())?;
config.0.build("anonymous", self).await
}
fn build_internal<T: ?Sized + Send + Sync + 'static>(
&self,
entrypoint: String,
) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
#[cfg(feature = "xp-store-composition")]
if entrypoint.contains("://") {
// There is a chance this is a url. we are building an anonymous store
return Box::pin(async move {
self.build_anonymous(entrypoint.clone())
.await
.map_err(|e| CompositionError::Failed(entrypoint, Arc::from(e)))
});
}
let mut stores = match self.composition {
Some(comp) => comp.stores.lock().unwrap(),
None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
};
let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) {
Some(v) => v,
None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
};
// for lifetime reasons, we put a placeholder value in the hashmap while we figure out what
// the new value should be. the Mutex stays locked the entire time, so nobody will ever see
// this temporary value.
let prev_val = std::mem::replace(
entry,
Box::new(InstantiationState::<T>::Done(Err(
CompositionError::Poisoned(entrypoint.clone()),
))),
);
let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() {
InstantiationState::Done(service) => (
InstantiationState::Done(service.clone()),
futures::future::ready(service).boxed(),
),
// the construction of the store has not started yet.
InstantiationState::Config(config) => {
let (tx, rx) = tokio::sync::watch::channel(None);
(
InstantiationState::InProgress(rx),
(async move {
let mut new_context = CompositionContext {
composition: self.composition,
registry: self.registry,
stack: self.stack.clone(),
};
new_context
.stack
.push((TypeId::of::<T>(), entrypoint.clone()));
let res =
config.build(&entrypoint, &new_context).await.map_err(|e| {
match e.downcast() {
Ok(e) => *e,
Err(e) => CompositionError::Failed(entrypoint, e.into()),
}
});
tx.send(Some(res.clone())).unwrap();
res
})
.boxed(),
)
}
// there is already a task driving forward the construction of this store, wait for it
// to notify us via the provided channel
InstantiationState::InProgress(mut recv) => {
(InstantiationState::InProgress(recv.clone()), {
(async move {
loop {
if let Some(v) =
recv.borrow_and_update().as_ref().map(|res| res.clone())
{
break v;
}
recv.changed().await.unwrap();
}
})
.boxed()
})
}
};
*entry = Box::new(new_val);
ret
} }
} }
@ -336,8 +440,8 @@ enum InstantiationState<T: ?Sized> {
Done(Result<Arc<T>, CompositionError>), Done(Result<Arc<T>, CompositionError>),
} }
#[derive(Default)]
pub struct Composition { pub struct Composition {
registry: &'static Registry,
stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>, stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>,
} }
@ -381,6 +485,14 @@ impl<T: ?Sized + Send + Sync + 'static>
} }
impl Composition { impl Composition {
/// The given registry will be used for creation of anonymous stores during composition
pub fn new(registry: &'static Registry) -> Self {
Self {
registry,
stores: Default::default(),
}
}
pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>( pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>(
&mut self, &mut self,
// Keep the concrete `HashMap` type here since it allows for type // Keep the concrete `HashMap` type here since it allows for type
@ -390,87 +502,17 @@ impl Composition {
self.extend(configs); self.extend(configs);
} }
/// Looks up the entrypoint name in the composition and returns an instantiated service.
pub async fn build<T: ?Sized + Send + Sync + 'static>( pub async fn build<T: ?Sized + Send + Sync + 'static>(
&self, &self,
entrypoint: &str, entrypoint: &str,
) -> Result<Arc<T>, CompositionError> { ) -> Result<Arc<T>, CompositionError> {
self.build_internal(vec![], entrypoint.to_string()).await self.context().build_internal(entrypoint.to_string()).await
}
fn build_internal<T: ?Sized + Send + Sync + 'static>(
&self,
stack: Vec<(TypeId, String)>,
entrypoint: String,
) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
let mut stores = self.stores.lock().unwrap();
let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) {
Some(v) => v,
None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
};
// for lifetime reasons, we put a placeholder value in the hashmap while we figure out what
// the new value should be. the Mutex stays locked the entire time, so nobody will ever see
// this temporary value.
let prev_val = std::mem::replace(
entry,
Box::new(InstantiationState::<T>::Done(Err(
CompositionError::Poisoned(entrypoint.clone()),
))),
);
let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() {
InstantiationState::Done(service) => (
InstantiationState::Done(service.clone()),
futures::future::ready(service).boxed(),
),
// the construction of the store has not started yet.
InstantiationState::Config(config) => {
let (tx, rx) = tokio::sync::watch::channel(None);
(
InstantiationState::InProgress(rx),
(async move {
let mut new_context = CompositionContext {
stack: stack.clone(),
composition: Some(self),
};
new_context
.stack
.push((TypeId::of::<T>(), entrypoint.clone()));
let res =
config.build(&entrypoint, &new_context).await.map_err(|e| {
match e.downcast() {
Ok(e) => *e,
Err(e) => CompositionError::Failed(entrypoint, e.into()),
}
});
tx.send(Some(res.clone())).unwrap();
res
})
.boxed(),
)
}
// there is already a task driving forward the construction of this store, wait for it
// to notify us via the provided channel
InstantiationState::InProgress(mut recv) => {
(InstantiationState::InProgress(recv.clone()), {
(async move {
loop {
if let Some(v) =
recv.borrow_and_update().as_ref().map(|res| res.clone())
{
break v;
}
recv.changed().await.unwrap();
}
})
.boxed()
})
}
};
*entry = Box::new(new_val);
ret
} }
pub fn context(&self) -> CompositionContext { pub fn context(&self) -> CompositionContext {
CompositionContext { CompositionContext {
registry: self.registry,
stack: vec![], stack: vec![],
composition: Some(self), composition: Some(self),
} }
@ -496,7 +538,7 @@ mod test {
let blob_services_configs = let blob_services_configs =
with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap(); with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap();
let mut blob_service_composition = Composition::default(); let mut blob_service_composition = Composition::new(&REG);
blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
let (blob_service1, blob_service2) = tokio::join!( let (blob_service1, blob_service2) = tokio::join!(
blob_service_composition.build::<dyn BlobService>("default"), blob_service_composition.build::<dyn BlobService>("default"),
@ -526,7 +568,7 @@ mod test {
let blob_services_configs = let blob_services_configs =
with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap(); with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap();
let mut blob_service_composition = Composition::default(); let mut blob_service_composition = Composition::new(&REG);
blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs); blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
match blob_service_composition match blob_service_composition
.build::<dyn BlobService>("default") .build::<dyn BlobService>("default")

View file

@ -37,7 +37,7 @@ pub async fn from_addr(
})? })?
.0; .0;
let directory_service = directory_service_config let directory_service = directory_service_config
.build("anonymous", &CompositionContext::blank()) .build("anonymous", &CompositionContext::blank(&REG))
.await?; .await?;
Ok(directory_service) Ok(directory_service)
@ -88,6 +88,16 @@ mod tests {
#[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
/// Correct scheme to connect to localhost over http, but with additional path, which is invalid. /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
#[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
/// A valid example for store composition using anonymous urls
#[cfg_attr(
feature = "xp-store-composition",
case::anonymous_url_composition("cache://?near=memory://&far=memory://", true)
)]
/// Store composition with anonymous urls should fail if the feature is disabled
#[cfg_attr(
not(feature = "xp-store-composition"),
case::anonymous_url_composition("cache://?near=memory://&far=memory://", false)
)]
/// A valid example for Bigtable /// A valid example for Bigtable
#[cfg_attr( #[cfg_attr(
all(feature = "cloud", feature = "integration"), all(feature = "cloud", feature = "integration"),

View file

@ -73,7 +73,7 @@ otlp = ["tvix-tracing/otlp"]
tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"]
tracy = ["tvix-tracing/tracy"] tracy = ["tvix-tracing/tracy"]
virtiofs = ["tvix-castore/virtiofs"] virtiofs = ["tvix-castore/virtiofs"]
xp-store-composition = ["toml"] xp-store-composition = ["toml", "tvix-castore/xp-store-composition"]
# Whether to run the integration tests. # Whether to run the integration tests.
# Requires the following packages in $PATH: # Requires the following packages in $PATH:
# cbtemulator, google-cloud-bigtable-tool # cbtemulator, google-cloud-bigtable-tool

View file

@ -44,7 +44,10 @@ pub async fn from_addr(
})? })?
.0; .0;
let path_info_service = path_info_service_config let path_info_service = path_info_service_config
.build("anonymous", context.unwrap_or(&CompositionContext::blank())) .build(
"anonymous",
context.unwrap_or(&CompositionContext::blank(&REG)),
)
.await?; .await?;
Ok(path_info_service) Ok(path_info_service)
@ -53,7 +56,7 @@ pub async fn from_addr(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::from_addr; use super::from_addr;
use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder}; use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder, REG};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use rstest::rstest; use rstest::rstest;
use tempfile::TempDir; use tempfile::TempDir;
@ -125,7 +128,7 @@ mod tests {
)] )]
#[tokio::test] #[tokio::test]
async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
let mut comp = Composition::default(); let mut comp = Composition::new(&REG);
comp.extend(vec![( comp.extend(vec![(
"default".into(), "default".into(),
DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {}) DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {})

View file

@ -188,7 +188,7 @@ pub async fn construct_services_from_configs(
), ),
Box<dyn std::error::Error + Send + Sync>, Box<dyn std::error::Error + Send + Sync>,
> { > {
let mut comp = Composition::default(); let mut comp = Composition::new(&REG);
comp.extend(configs.blobservices); comp.extend(configs.blobservices);
comp.extend(configs.directoryservices); comp.extend(configs.directoryservices);