From 1a6b6e3ef310c8eea37b55f8007c85a8772ff8e9 Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 17 Jun 2024 01:10:55 +0200 Subject: [PATCH] feat(tvix/castore): add composition module Change-Id: I0868f3278db85ae5fe030089ee9033837bc08748 Signed-off-by: Yureka Reviewed-on: https://cl.tvl.fyi/c/depot/+/11853 Reviewed-by: flokli Tested-by: BuildkiteCI --- tvix/Cargo.lock | 24 +- tvix/Cargo.nix | 59 ++- tvix/castore/Cargo.toml | 3 + tvix/castore/src/blobservice/combinator.rs | 29 ++ tvix/castore/src/blobservice/grpc.rs | 22 + tvix/castore/src/blobservice/memory.rs | 17 + tvix/castore/src/blobservice/mod.rs | 18 +- tvix/castore/src/blobservice/object_store.rs | 36 ++ tvix/castore/src/composition.rs | 377 ++++++++++++++++++ tvix/castore/src/directoryservice/bigtable.rs | 87 ++-- .../src/directoryservice/combinators.rs | 29 ++ tvix/castore/src/directoryservice/grpc.rs | 23 ++ tvix/castore/src/directoryservice/memory.rs | 17 + tvix/castore/src/directoryservice/mod.rs | 25 +- .../src/directoryservice/object_store.rs | 31 +- tvix/castore/src/lib.rs | 1 + 16 files changed, 747 insertions(+), 51 deletions(-) create mode 100644 tvix/castore/src/composition.rs diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index fdbd779e7..793938335 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -1067,11 +1067,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b73807008a3c7f171cc40312f37d95ef0396e048b5848d775f54b1a4dd4a0d3" +checksum = "24e2389d65ab4fab27dc2a5de7b191e1f6617d1f1c8855c0dc569c94a4cbb18d" dependencies = [ "serde", + "typeid", ] [[package]] @@ -3420,6 +3421,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_tagged" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76cd248df2ce32924bfc2273e1af035ff3092b73253fe0567230b5c4154a99e9" +dependencies = [ + "erased-serde", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -4329,6 +4340,7 @@ dependencies = [ "bytes", "data-encoding", "digest", + "erased-serde", "fastcdc", "fuse-backend-rs", "futures", @@ -4344,7 +4356,9 @@ dependencies = [ "rstest", "rstest_reuse", "serde", + "serde_json", "serde_qs", + "serde_tagged", "serde_with", "sled", "tempfile", @@ -4575,6 +4589,12 @@ dependencies = [ "tracing-tracy", ] +[[package]] +name = "typeid" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "059d83cc991e7a42fc37bd50941885db0888e34209f8cfd9aab07ddec03bc9cf" + [[package]] name = "typenum" version = "1.17.0" diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 8741c60bf..8a52332ab 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -3213,9 +3213,10 @@ rec { }; "erased-serde" = rec { crateName = "erased-serde"; - version = "0.4.4"; + version = "0.4.5"; edition = "2021"; - sha256 = "1lx0si6iljzmfpblhn4b0ip3kw2yv4vjyca0riqz3ix311q80wrb"; + sha256 = "13dirfj9972nvk05b20w3xyn3xp1j6qyfp9avhksnkxbcnfkiqi4"; + libName = "erased_serde"; authors = [ "David Tolnay " ]; @@ -3225,13 +3226,17 @@ rec { packageId = "serde"; usesDefaultFeatures = false; } + { + name = "typeid"; + packageId = "typeid"; + } ]; features = { "alloc" = [ "serde/alloc" ]; "default" = [ "std" ]; "std" = [ "alloc" "serde/std" ]; }; - resolvedDefaultFeatures = [ "alloc" ]; + resolvedDefaultFeatures = [ "alloc" "default" "std" ]; }; "errno" = rec { crateName = "errno"; @@ -10539,6 +10544,32 @@ rec { }; resolvedDefaultFeatures = [ "serde" ]; }; + "serde_tagged" = rec { + crateName = "serde_tagged"; + version = "0.3.0"; + edition = "2015"; + sha256 = "1scr98aw9d9hf9bf0gr5fcmhkwsz0fpy2wr2zi5r4cnfya6j9kbn"; + authors = [ + "qzed " + ]; + dependencies = [ + { + name = "erased-serde"; + packageId = "erased-serde"; + optional = true; + } + { + name = "serde"; + packageId = "serde"; + } + ]; + features = { + "default" = [ "erased" ]; + "erased" = [ "erased-serde" ]; + "erased-serde" = [ "dep:erased-serde" ]; + }; + resolvedDefaultFeatures = [ "default" "erased" "erased-serde" ]; + }; "serde_urlencoded" = rec { crateName = "serde_urlencoded"; version = "0.7.1"; @@ -13634,6 +13665,10 @@ rec { name = "digest"; packageId = "digest"; } + { + name = "erased-serde"; + packageId = "erased-serde"; + } { name = "fastcdc"; packageId = "fastcdc"; @@ -13687,6 +13722,10 @@ rec { name = "serde_qs"; packageId = "serde_qs"; } + { + name = "serde_tagged"; + packageId = "serde_tagged"; + } { name = "serde_with"; packageId = "serde_with"; @@ -13819,6 +13858,10 @@ rec { name = "rstest_reuse"; packageId = "rstest_reuse"; } + { + name = "serde_json"; + packageId = "serde_json"; + } { name = "tempfile"; packageId = "tempfile"; @@ -14661,6 +14704,16 @@ rec { }; resolvedDefaultFeatures = [ "default" "otlp" "reqwest" "tonic" "tracy" ]; }; + "typeid" = rec { + crateName = "typeid"; + version = "1.0.0"; + edition = "2021"; + sha256 = "1ky97g0dwzdhmbcwzy098biqh26vhlc98l5x6zy44yhyk7687785"; + authors = [ + "David Tolnay " + ]; + + }; "typenum" = rec { crateName = "typenum"; version = "1.17.0"; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index d5d5c73f0..f428101bd 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -37,6 +37,8 @@ serde = { version = "1.0.197", features = [ "derive" ] } serde_with = "3.7.0" serde_qs = "0.12.0" petgraph = "0.6.4" +erased-serde = "0.4.5" +serde_tagged = "0.3.0" [dependencies.bigtable_rs] optional = true @@ -94,6 +96,7 @@ tokio-retry = "0.3.0" hex-literal = "0.4.1" rstest_reuse = "0.6.0" xattr = "1.3.1" +serde_json = "*" [features] default = ["cloud"] diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs index 067eff96f..0bce657e9 100644 --- a/tvix/castore/src/blobservice/combinator.rs +++ b/tvix/castore/src/blobservice/combinator.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + use futures::{StreamExt, TryStreamExt}; use tokio_util::io::{ReaderStream, StreamReader}; use tonic::async_trait; use tracing::{instrument, warn}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::B3Digest; use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; @@ -93,6 +96,32 @@ where } } +#[derive(serde::Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct CombinedBlobServiceConfig { + local: String, + remote: String, +} + +#[async_trait] +impl ServiceBuilder for CombinedBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result, Box> { + let (local, remote) = futures::join!( + context.resolve(self.local.clone()), + context.resolve(self.remote.clone()) + ); + Ok(Arc::new(CombinedBlobService { + local: local?, + remote: remote?, + })) + } +} + fn make_chunked_reader( // This must consume, as we can't retain references to blob_service, // as it'd add a lifetime to BlobReader in general, which will get diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 85250da99..56a2ebf00 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,4 +1,5 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, @@ -180,6 +181,27 @@ where } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCBlobServiceConfig { + url: String, +} + +#[async_trait] +impl ServiceBuilder for GRPCBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result, Box> { + let client = proto::blob_service_client::BlobServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCBlobService::from_client(client))) + } +} + pub struct GRPCBlobWriter { /// The task containing the put request, and the inner writer, if we're still writing. task_and_writer: Option<(JoinHandle>, W)>, diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 873d06b46..0205dcf7b 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -6,6 +6,7 @@ use tonic::async_trait; use tracing::instrument; use super::{BlobReader, BlobService, BlobWriter}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::B3Digest; #[derive(Clone, Default)] @@ -37,6 +38,22 @@ impl BlobService for MemoryBlobService { } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryBlobServiceConfig {} + +#[async_trait] +impl ServiceBuilder for MemoryBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result, Box> { + Ok(Arc::new(MemoryBlobService::default())) + } +} + pub struct MemoryBlobWriter { db: Arc>>>, diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index 50acd40bf..83fb5b674 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -1,6 +1,8 @@ use std::io; + use tonic::async_trait; +use crate::composition::{Registry, ServiceBuilder}; use crate::proto::stat_blob_response::ChunkMeta; use crate::B3Digest; @@ -16,11 +18,11 @@ mod object_store; pub mod tests; pub use self::chunked_reader::ChunkedReader; -pub use self::combinator::CombinedBlobService; +pub use self::combinator::{CombinedBlobService, CombinedBlobServiceConfig}; pub use self::from_addr::from_addr; -pub use self::grpc::GRPCBlobService; -pub use self::memory::MemoryBlobService; -pub use self::object_store::ObjectStoreBlobService; +pub use self::grpc::{GRPCBlobService, GRPCBlobServiceConfig}; +pub use self::memory::{MemoryBlobService, MemoryBlobServiceConfig}; +pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfig}; /// The base trait all BlobService services need to implement. /// It provides functions to check whether a given blob exists, @@ -101,3 +103,11 @@ impl BlobReader for io::Cursor<&'static [u8; 0]> {} impl BlobReader for io::Cursor> {} impl BlobReader for io::Cursor {} impl BlobReader for tokio::fs::File {} + +/// Registers the builtin BlobService implementations with the registry +pub(crate) fn register_blob_services(reg: &mut Registry) { + reg.register::>, super::blobservice::ObjectStoreBlobServiceConfig>("objectstore"); + reg.register::>, super::blobservice::MemoryBlobServiceConfig>("memory"); + reg.register::>, super::blobservice::CombinedBlobServiceConfig>("combined"); + reg.register::>, super::blobservice::GRPCBlobServiceConfig>("grpc"); +} diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs index d2d0a288a..bd8c2bd74 100644 --- a/tvix/castore/src/blobservice/object_store.rs +++ b/tvix/castore/src/blobservice/object_store.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, io::{self, Cursor}, pin::pin, sync::Arc, @@ -18,6 +19,7 @@ use tracing::{debug, instrument, trace, Level}; use url::Url; use crate::{ + composition::{CompositionContext, ServiceBuilder}, proto::{stat_blob_response::ChunkMeta, StatBlobResponse}, B3Digest, B3HashingReader, }; @@ -269,6 +271,40 @@ impl BlobService for ObjectStoreBlobService { } } +fn default_avg_chunk_size() -> u32 { + 256 * 1024 +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreBlobServiceConfig { + object_store_url: String, + #[serde(default = "default_avg_chunk_size")] + avg_chunk_size: u32, + #[serde(default)] + object_store_options: HashMap, +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result, Box> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreBlobService { + object_store: Arc::new(object_store), + base_path: path, + avg_chunk_size: self.avg_chunk_size, + })) + } +} + /// Reads blob contents from a AsyncRead, chunks and uploads them. /// On success, returns a [StatBlobResponse] pointing to the individual chunks. #[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)] diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs new file mode 100644 index 000000000..88cedcb83 --- /dev/null +++ b/tvix/castore/src/composition.rs @@ -0,0 +1,377 @@ +//! The composition module allows composing different kinds of services based on a set of service +//! configurations _at runtime_. +//! +//! Store configs are deserialized with serde. The registry provides a stateful mapping from the +//! `type` tag of an internally tagged enum on the serde side to a Config struct which is +//! deserialized and then returned as a `Box>` +//! (the same for DirectoryService instead of BlobService etc). +//! +//! ### Example 1.: Implementing a new BlobService +//! +//! You need a Config struct which implements `DeserializeOwned` and +//! `ServiceBuilder`. +//! Provide the user with a function to call with +//! their registry. You register your new type as: +//! +//! ``` +//! use std::sync::Arc; +//! +//! use tvix_castore::composition::*; +//! use tvix_castore::blobservice::BlobService; +//! +//! #[derive(serde::Deserialize)] +//! struct MyBlobServiceConfig { +//! } +//! +//! #[tonic::async_trait] +//! impl ServiceBuilder for MyBlobServiceConfig { +//! type Output = dyn BlobService; +//! async fn build(&self, _: &str, _: &CompositionContext) -> Result, Box> { +//! todo!() +//! } +//! } +//! +//! pub fn add_my_service(reg: &mut Registry) { +//! reg.register::>, MyBlobServiceConfig>("myblobservicetype"); +//! } +//! ``` +//! +//! Now, when a user deserializes a store config with the type tag "myblobservicetype" into a +//! `Box>>`, it will be done via `MyBlobServiceConfig`. +//! +//! ### Example 2.: Composing stores to get one store +//! +//! ``` +//! use tvix_castore::composition::*; +//! use tvix_castore::blobservice::BlobService; +//! +//! # fn main() -> Result<(), Box> { +//! # tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async move { +//! let blob_services_configs_json = serde_json::json!({ +//! "blobstore1": { +//! "type": "memory" +//! }, +//! "blobstore2": { +//! "type": "memory" +//! }, +//! "default": { +//! "type": "combined", +//! "local": "blobstore1", +//! "remote": "blobstore2" +//! } +//! }); +//! +//! let blob_services_configs = with_registry(®, || serde_json::from_value(blob_services_configs_json))?; +//! let blob_service_composition = Composition::::from_configs(blob_services_configs); +//! let blob_service = blob_service_composition.build("default").await?; +//! # Ok(()) +//! # }) +//! # } +//! ``` +//! +//! ### Example 3.: Creating another registry extending the default registry with third-party types +//! +//! ``` +//! # pub fn add_my_service(reg: &mut tvix_castore::composition::Registry) {} +//! let mut my_registry = tvix_castore::composition::Registry::default(); +//! tvix_castore::composition::add_default_services(&mut my_registry); +//! add_my_service(&mut my_registry); +//! ``` +//! +//! Continue with Example 2, with my_registry instead of REG + +use erased_serde::deserialize; +use futures::future::BoxFuture; +use futures::FutureExt; +use lazy_static::lazy_static; +use serde::de::DeserializeOwned; +use serde_tagged::de::{BoxFnSeed, SeedFactory}; +use serde_tagged::util::TagString; +use std::any::{Any, TypeId}; +use std::cell::Cell; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::Arc; +use tonic::async_trait; + +/// Resolves tag names to the corresponding Config type. +// Registry implementation details: +// This is really ugly. Really we would want to store this as a generic static field: +// +// ``` +// struct Registry(BTreeMap<(&'static str), BoxSeedFn); +// static REG: Registry; +// ``` +// +// so that one version of the static is generated for each Type that the registry is accessed for. +// However, this is not possible, because generics are only a thing in functions, and even there +// they will not interact with static items: +// https://doc.rust-lang.org/reference/items/static-items.html#statics--generics +// +// So instead, we make this lookup at runtime by putting the TypeId into the key. +// But now we can no longer store the `BoxFnSeed` because we are lacking the generic parameter +// T, so instead store it as `Box` and downcast to `&BoxFnSeed` when performing the +// lookup. +// I said it was ugly... +#[derive(Default)] +pub struct Registry(BTreeMap<(TypeId, &'static str), Box>); + +struct RegistryWithFakeType<'r, T>(&'r Registry, PhantomData); + +impl<'r, 'de: 'r, T: 'static> SeedFactory<'de, TagString<'de>> for RegistryWithFakeType<'r, T> { + type Value = DeserializeWithRegistry; + type Seed = &'r BoxFnSeed; + + // Required method + fn seed(self, tag: TagString<'de>) -> Result + where + E: serde::de::Error, + { + // using find() and not get() because of https://github.com/rust-lang/rust/issues/80389 + let seed: &Box = self + .0 + .0 + .iter() + .find(|(k, _)| *k == &(TypeId::of::(), tag.as_ref())) + .ok_or_else(|| serde::de::Error::custom("Unknown tag"))? + .1; + + Ok(::downcast_ref(&**seed).unwrap()) + } +} + +/// Wrapper type which implements Deserialize using the registry +/// +/// Wrap your type in this in order to deserialize it using a registry, e.g. +/// `RegistryWithFakeType>`, then the types registered for `Box` +/// will be used. +pub struct DeserializeWithRegistry(T); + +impl Registry { + /// Registers a mapping from type tag to a concrete type into the registry. + /// + /// The type parameters are very important: + /// After calling `register::, FooStruct>("footype")`, when a user + /// deserializes into an input with the type tag "myblobservicetype" into a + /// `Box`, it will first call the Deserialize imple of `FooStruct` and + /// then convert it into a `Box` using From::from. + pub fn register>(&mut self, type_name: &'static str) { + let seed = BoxFnSeed::new(|x| { + deserialize::(x) + .map(Into::into) + .map(DeserializeWithRegistry) + }); + self.0 + .insert((TypeId::of::(), type_name), Box::new(seed)); + } +} + +impl<'de, T: 'static> serde::Deserialize<'de> for DeserializeWithRegistry { + fn deserialize(de: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + serde_tagged::de::internal::deserialize( + de, + "type", + RegistryWithFakeType(ACTIVE_REG.get().unwrap(), PhantomData::), + ) + } +} + +thread_local! { + /// The active Registry is global state, because there is no convenient and universal way to pass state + /// into the functions usually used for deserialization, e.g. `serde_json::from_str`, `toml::from_str`, + /// `serde_qs::from_str`. + static ACTIVE_REG: Cell> = panic!("reg was accessed before initialization"); +} + +/// Run the provided closure with a registry context. +/// Any serde deserialize calls within the closure will use the registry to resolve tag names to +/// the corresponding Config type. +pub fn with_registry(reg: &'static Registry, f: impl FnOnce() -> R) -> R { + ACTIVE_REG.set(Some(reg)); + let result = f(); + ACTIVE_REG.set(None); + result +} + +lazy_static! { + /// The provided registry of tvix_castore, with all builtin BlobStore/DirectoryStore implementations + pub static ref REG: Registry = { + let mut reg = Registry(Default::default()); + add_default_services(&mut reg); + reg + }; +} + +// ---------- End of generic registry code --------- // + +/// Register the builtin services of tvix_castore with the given registry. +/// This is useful for creating your own registry with the builtin types _and_ +/// extra third party types. +pub fn add_default_services(reg: &mut Registry) { + crate::blobservice::register_blob_services(reg); + crate::directoryservice::register_directory_services(reg); +} + +pub struct CompositionContext<'a, T: ?Sized> { + stack: Vec, + composition: &'a Composition, +} + +impl<'a, T: ?Sized + Send + Sync + 'static> CompositionContext<'a, T> { + pub async fn resolve( + &self, + entrypoint: String, + ) -> Result, Box> { + // disallow recursion + if self.stack.contains(&entrypoint) { + return Err(CompositionError::Recursion(self.stack.clone()).into()); + } + Ok(self + .composition + .build_internal(self.stack.clone(), entrypoint) + .await?) + } +} + +#[async_trait] +/// This is the trait usually implemented on a per-store-type Config struct and +/// used to instantiate it. +pub trait ServiceBuilder: Send + Sync { + type Output: ?Sized; + async fn build( + &self, + instance_name: &str, + context: &CompositionContext, + ) -> Result, Box>; +} + +impl + 'static> From + for Box> +{ + fn from(t: S) -> Self { + Box::new(t) + } +} + +enum InstantiationState { + Config(Box>), + InProgress(tokio::sync::watch::Receiver, CompositionError>>>), + Done(Result, CompositionError>), +} + +pub struct Composition { + stores: std::sync::Mutex>>, +} + +#[derive(thiserror::Error, Clone, Debug)] +pub enum CompositionError { + #[error("store not found: {0}")] + NotFound(String), + #[error("recursion not allowed {0:?}")] + Recursion(Vec), + #[error("store construction panicked {0}")] + Poisoned(String), + #[error("instantiation of service {0} failed: {1}")] + Failed(String, Arc), +} + +impl Composition { + pub fn from_configs( + // Keep the concrete `HashMap` type here since it allows for type + // inference of what type is previously being deserialized. + configs: HashMap>>>, + ) -> Self { + Self::from_configs_iter(configs) + } + + pub fn from_configs_iter( + configs: impl IntoIterator< + Item = ( + String, + DeserializeWithRegistry>>, + ), + >, + ) -> Self { + Composition { + stores: std::sync::Mutex::new( + configs + .into_iter() + .map(|(k, v)| (k, InstantiationState::Config(v.0))) + .collect(), + ), + } + } + + pub async fn build(&self, entrypoint: &str) -> Result, CompositionError> { + self.build_internal(vec![], entrypoint.to_string()).await + } + + fn build_internal( + &self, + stack: Vec, + entrypoint: String, + ) -> BoxFuture<'_, Result, CompositionError>> { + let mut stores = self.stores.lock().unwrap(); + let entry = match stores.get_mut(&entrypoint) { + Some(v) => v, + None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))), + }; + // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what + // the new value should be. the Mutex stays locked the entire time, so nobody will ever see + // this temporary value. + let prev_val = std::mem::replace( + entry, + InstantiationState::Done(Err(CompositionError::Poisoned(entrypoint.clone()))), + ); + let (new_val, ret) = match prev_val { + InstantiationState::Done(service) => ( + InstantiationState::Done(service.clone()), + futures::future::ready(service).boxed(), + ), + // the construction of the store has not started yet. + InstantiationState::Config(config) => { + let (tx, rx) = tokio::sync::watch::channel(None); + ( + InstantiationState::InProgress(rx), + (async move { + let mut new_context = CompositionContext { + stack: stack.clone(), + composition: self, + }; + new_context.stack.push(entrypoint.clone()); + let res = config + .build(&entrypoint, &new_context) + .await + .map_err(|e| CompositionError::Failed(entrypoint, e.into())); + tx.send(Some(res.clone())).unwrap(); + res + }) + .boxed(), + ) + } + // there is already a task driving forward the construction of this store, wait for it + // to notify us via the provided channel + InstantiationState::InProgress(mut recv) => { + (InstantiationState::InProgress(recv.clone()), { + (async move { + loop { + if let Some(v) = + recv.borrow_and_update().as_ref().map(|res| res.clone()) + { + break v; + } + recv.changed().await.unwrap(); + } + }) + .boxed() + }) + } + }; + *entry = new_val; + ret + } +} diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index 1194c6ddc..69edc05d7 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -5,10 +5,12 @@ use futures::stream::BoxStream; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; +use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace, warn}; use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; /// There should not be more than 10 MiB in a single cell. @@ -43,41 +45,6 @@ pub struct BigtableDirectoryService { emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, } -/// Represents configuration of [BigtableDirectoryService]. -/// This currently conflates both connect parameters and data model/client -/// behaviour parameters. -#[serde_as] -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct BigtableParameters { - project_id: String, - instance_name: String, - #[serde(default)] - is_read_only: bool, - #[serde(default = "default_channel_size")] - channel_size: usize, - - #[serde_as(as = "Option>")] - #[serde(default = "default_timeout")] - timeout: Option, - table_name: String, - family_name: String, - - #[serde(default = "default_app_profile_id")] - app_profile_id: String, -} - -fn default_app_profile_id() -> String { - "default".to_owned() -} - -fn default_channel_size() -> usize { - 4 -} - -fn default_timeout() -> Option { - Some(std::time::Duration::from_secs(4)) -} - impl BigtableDirectoryService { #[cfg(not(test))] pub async fn connect(params: BigtableParameters) -> Result { @@ -355,3 +322,53 @@ impl DirectoryService for BigtableDirectoryService { Box::new(SimplePutter::new(self.clone())) } } + +/// Represents configuration of [BigtableDirectoryService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option>")] + #[serde(default = "default_timeout")] + timeout: Option, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +#[async_trait] +impl ServiceBuilder for BigtableParameters { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result, Box> { + Ok(Arc::new( + BigtableDirectoryService::connect(self.clone()).await?, + )) + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option { + Some(std::time::Duration::from_secs(4)) +} diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index d3f351d6b..2364a313d 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use futures::stream::BoxStream; use futures::StreamExt; use futures::TryFutureExt; @@ -6,6 +8,7 @@ use tonic::async_trait; use tracing::{instrument, trace}; use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::directoryservice::DirectoryPutter; use crate::proto; use crate::B3Digest; @@ -140,3 +143,29 @@ where Box::new(SimplePutter::new((*self).clone())) } } + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct CacheConfig { + near: String, + far: String, +} + +#[async_trait] +impl ServiceBuilder for CacheConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result, Box> { + let (near, far) = futures::join!( + context.resolve(self.near.clone()), + context.resolve(self.far.clone()) + ); + Ok(Arc::new(Cache { + near: near?, + far: far?, + })) + } +} diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index ca9b0de07..e2ca3954c 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,10 +1,12 @@ use std::collections::HashSet; use super::{DirectoryPutter, DirectoryService}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; use crate::{B3Digest, Error}; use async_stream::try_stream; use futures::stream::BoxStream; +use std::sync::Arc; use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; @@ -216,6 +218,27 @@ where } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCDirectoryServiceConfig { + url: String, +} + +#[async_trait] +impl ServiceBuilder for GRPCDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result, Box> { + let client = proto::directory_service_client::DirectoryServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCDirectoryService::from_client(client))) + } +} + /// Allows uploading multiple Directory messages in the same gRPC stream. pub struct GRPCPutter { /// Data about the current request - a handle to the task, and the tx part diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index 3b2795c39..f12ef6e97 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -8,6 +8,7 @@ use tracing::{instrument, warn}; use super::utils::traverse_directory; use super::{DirectoryPutter, DirectoryService, SimplePutter}; +use crate::composition::{CompositionContext, ServiceBuilder}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -85,3 +86,19 @@ impl DirectoryService for MemoryDirectoryService { Box::new(SimplePutter::new(self.clone())) } } + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryDirectoryServiceConfig {} + +#[async_trait] +impl ServiceBuilder for MemoryDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result, Box> { + Ok(Arc::new(MemoryDirectoryService::default())) + } +} diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index eff4a685f..815502c81 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,7 +1,8 @@ +use crate::composition::{Registry, ServiceBuilder}; use crate::{proto, B3Digest, Error}; + use futures::stream::BoxStream; use tonic::async_trait; - mod combinators; mod directory_graph; mod from_addr; @@ -16,12 +17,12 @@ pub mod tests; mod traverse; mod utils; -pub use self::combinators::Cache; +pub use self::combinators::{Cache, CacheConfig}; pub use self::directory_graph::DirectoryGraph; pub use self::from_addr::from_addr; -pub use self::grpc::GRPCDirectoryService; -pub use self::memory::MemoryDirectoryService; -pub use self::object_store::ObjectStoreDirectoryService; +pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig}; +pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig}; +pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectoryServiceConfig}; pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; pub use self::simple_putter::SimplePutter; pub use self::sled::SledDirectoryService; @@ -32,7 +33,7 @@ pub use self::utils::traverse_directory; mod bigtable; #[cfg(feature = "cloud")] -pub use self::bigtable::BigtableDirectoryService; +pub use self::bigtable::{BigtableDirectoryService, BigtableParameters}; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their @@ -126,3 +127,15 @@ pub trait DirectoryPutter: Send { /// be returned. async fn close(&mut self) -> Result; } + +/// Registers the builtin DirectoryService implementations with the registry +pub(crate) fn register_directory_services(reg: &mut Registry) { + reg.register::>, super::directoryservice::ObjectStoreDirectoryServiceConfig>("objectstore"); + reg.register::>, super::directoryservice::MemoryDirectoryServiceConfig>("memory"); + reg.register::>, super::directoryservice::CacheConfig>("cache"); + reg.register::>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc"); + #[cfg(feature = "cloud")] + { + reg.register::>, super::directoryservice::BigtableParameters>("bigtable"); + } +} diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index feaaaa39c..1977de18f 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use data_encoding::HEXLOWER; @@ -18,6 +19,7 @@ use url::Url; use super::{ DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, }; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; /// Stores directory closures in an object store. @@ -46,7 +48,7 @@ fn derive_dirs_path(base_path: &Path, digest: &B3Digest) -> Path { const MAX_FRAME_LENGTH: usize = 1 * 1024 * 1024 * 1000; // 1 MiB // impl ObjectStoreDirectoryService { - /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// Constructs a new [ObjectStoreDirectoryService] from a [Url] supported by /// [object_store]. /// Any path suffix becomes the base path of the object store. /// additional options, the same as in [object_store::parse_url_opts] can @@ -169,6 +171,33 @@ impl DirectoryService for ObjectStoreDirectoryService { } } +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreDirectoryServiceConfig { + object_store_url: String, + #[serde(default)] + object_store_options: HashMap, +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result, Box> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreDirectoryService { + object_store: Arc::new(object_store), + base_path: path, + })) + } +} + struct ObjectStoreDirectoryPutter { object_store: Arc, base_path: Path, diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index bdc533a8c..4fca9801c 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -3,6 +3,7 @@ mod errors; mod hashing_reader; pub mod blobservice; +pub mod composition; pub mod directoryservice; pub mod fixtures;