2023-05-18 21:43:33 +03:00
|
|
|
use crate::{proto, B3Digest, Error};
|
2023-09-19 11:46:41 -05:00
|
|
|
use futures::Stream;
|
|
|
|
use std::pin::Pin;
|
|
|
|
use tonic::async_trait;
|
2023-06-09 23:35:46 +03:00
|
|
|
|
|
|
|
mod from_addr;
|
feat(tvix/store/directorysvc): add gRPC client
This provides a GRPCDirectoryService struct implementing
DirectoryService, allowing a client to Directory objects from a (remote)
tvix-store.
Remote in this case is anything outside the current process, be it
another process, or an endpoint on the network.
To keep the sync interface in the `DirectoryService` trait, a handle to
some tokio runtime needs to be passed into the constructor, and the two
methods use `self.tokio_handle.spawn` to start an async function, and
`self.tokio_handle.block_on` to wait for its completion.
The client handle, called `grpc_client` itself is easy to clone, and
treats concurrent requests internally. This means, even though we keep
the `DirectoryService` trait sync, there's nothing preventing it from
being used concurrently, let's say from multiple threads.
There's still two limitations for now:
1) The trait doesn't make use of the `recursive` request, which
currently leads to a N+1 query problem. This can be fixed
by `GRPCDirectoryService` having a reference to another
`DirectoryService` acting as the local side.
I want to wait for general store composition code to pop up before
manually coding this here.
2) It's currently only possible to put() leaf directory nodes, as the
request normally requires uploading a whole closure. We might want
to add another batch function to upload a whole closure, and/or do
this batching in certain cases. This still needs some more thinking.
Change-Id: I7ffec791610b72c0960cf5307cefbb12ec946dc9
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8336
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
2023-03-23 13:49:57 +01:00
|
|
|
mod grpc;
|
2023-02-12 14:24:43 +01:00
|
|
|
mod memory;
|
|
|
|
mod sled;
|
2023-05-14 14:49:53 +03:00
|
|
|
mod traverse;
|
2023-03-27 14:47:57 +02:00
|
|
|
mod utils;
|
2023-02-12 14:24:43 +01:00
|
|
|
|
2023-06-09 23:35:46 +03:00
|
|
|
pub use self::from_addr::from_addr;
|
feat(tvix/store/directorysvc): add gRPC client
This provides a GRPCDirectoryService struct implementing
DirectoryService, allowing a client to Directory objects from a (remote)
tvix-store.
Remote in this case is anything outside the current process, be it
another process, or an endpoint on the network.
To keep the sync interface in the `DirectoryService` trait, a handle to
some tokio runtime needs to be passed into the constructor, and the two
methods use `self.tokio_handle.spawn` to start an async function, and
`self.tokio_handle.block_on` to wait for its completion.
The client handle, called `grpc_client` itself is easy to clone, and
treats concurrent requests internally. This means, even though we keep
the `DirectoryService` trait sync, there's nothing preventing it from
being used concurrently, let's say from multiple threads.
There's still two limitations for now:
1) The trait doesn't make use of the `recursive` request, which
currently leads to a N+1 query problem. This can be fixed
by `GRPCDirectoryService` having a reference to another
`DirectoryService` acting as the local side.
I want to wait for general store composition code to pop up before
manually coding this here.
2) It's currently only possible to put() leaf directory nodes, as the
request normally requires uploading a whole closure. We might want
to add another batch function to upload a whole closure, and/or do
this batching in certain cases. This still needs some more thinking.
Change-Id: I7ffec791610b72c0960cf5307cefbb12ec946dc9
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8336
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
2023-03-23 13:49:57 +01:00
|
|
|
pub use self::grpc::GRPCDirectoryService;
|
2023-02-12 14:24:43 +01:00
|
|
|
pub use self::memory::MemoryDirectoryService;
|
|
|
|
pub use self::sled::SledDirectoryService;
|
2023-05-14 14:49:53 +03:00
|
|
|
pub use self::traverse::traverse_to;
|
2023-02-12 14:24:43 +01:00
|
|
|
|
|
|
|
/// The base trait all Directory services need to implement.
|
|
|
|
/// This is a simple get and put of [crate::proto::Directory], returning their
|
|
|
|
/// digest.
|
2023-09-19 11:46:41 -05:00
|
|
|
#[async_trait]
|
2023-06-09 12:26:34 +03:00
|
|
|
pub trait DirectoryService: Send + Sync {
|
2023-06-09 23:35:46 +03:00
|
|
|
/// Create a new instance by passing in a connection URL.
|
2023-09-19 11:46:41 -05:00
|
|
|
/// TODO: check if we want to make this async, instead of lazily connecting
|
2023-06-09 23:35:46 +03:00
|
|
|
fn from_url(url: &url::Url) -> Result<Self, Error>
|
|
|
|
where
|
|
|
|
Self: Sized;
|
|
|
|
|
2023-02-12 14:24:43 +01:00
|
|
|
/// Get looks up a single Directory message by its digest.
|
|
|
|
/// In case the directory is not found, Ok(None) is returned.
|
2023-09-19 11:46:41 -05:00
|
|
|
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
|
2023-02-12 14:24:43 +01:00
|
|
|
/// Get uploads a single Directory message, and returns the calculated
|
|
|
|
/// digest, or an error.
|
2023-09-19 11:46:41 -05:00
|
|
|
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
|
2023-03-26 13:51:48 +02:00
|
|
|
|
|
|
|
/// Looks up a closure of [proto::Directory].
|
2023-09-19 11:46:41 -05:00
|
|
|
/// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
|
2023-03-26 13:51:48 +02:00
|
|
|
/// and we'd be able to add a default implementation for it here, but
|
|
|
|
/// we can't have that yet.
|
2023-09-19 11:46:41 -05:00
|
|
|
///
|
|
|
|
/// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
|
|
|
|
/// and the box allows different underlying stream implementations to be returned since
|
|
|
|
/// Rust doesn't support this as a generic in traits yet. This is the same thing that
|
|
|
|
/// [async_trait] generates, but for streams instead of futures.
|
2023-06-09 12:26:34 +03:00
|
|
|
fn get_recursive(
|
|
|
|
&self,
|
|
|
|
root_directory_digest: &B3Digest,
|
2023-09-19 11:46:41 -05:00
|
|
|
) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>;
|
2023-03-27 17:08:16 +02:00
|
|
|
|
|
|
|
/// Allows persisting a closure of [proto::Directory], which is a graph of
|
|
|
|
/// connected Directory messages.
|
2023-06-09 12:26:34 +03:00
|
|
|
fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
|
2023-03-27 17:08:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Provides a handle to put a closure of connected [proto::Directory] elements.
|
|
|
|
///
|
2023-09-03 11:02:19 +03:00
|
|
|
/// The consumer can periodically call [DirectoryPutter::put], starting from the
|
|
|
|
/// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
|
|
|
|
/// retrieve the root digest (or an error).
|
2023-09-19 11:46:41 -05:00
|
|
|
#[async_trait]
|
refactor(tvix/store/blobsvc): make BlobStore async
We previously kept the trait of a BlobService sync.
This however had some annoying consequences:
- It became more and more complicated to track when we're in a context
with an async runtime in the context or not, producing bugs like
https://b.tvl.fyi/issues/304
- The sync trait shielded away async clients from async worloads,
requiring manual block_on code inside the gRPC client code, and
spawn_blocking calls in consumers of the trait, even if they were
async (like the gRPC server)
- We had to write our own custom glue code (SyncReadIntoAsyncRead)
to convert a sync io::Read into a tokio::io::AsyncRead, which already
existed in tokio internally, but upstream ia hesitant to expose.
This now makes the BlobService trait async (via the async_trait macro,
like we already do in various gRPC parts), and replaces the sync readers
and writers with their async counterparts.
Tests interacting with a BlobService now need to have an async runtime
available, the easiest way for this is to mark the test functions
with the tokio::test macro, allowing us to directly .await in the test
function.
In places where we don't have an async runtime available from context
(like tvix-cli), we can pass one down explicitly.
Now that we don't provide a sync interface anymore, the (sync) FUSE
library now holds a pointer to a tokio runtime handle, and needs to at
least have 2 threads available when talking to a blob service (which is
why some of the tests now use the multi_thread flavor).
The FUSE tests got a bit more verbose, as we couldn't use the
setup_and_mount function accepting a callback anymore. We can hopefully
move some of the test fixture setup to rstest in the future to make this
less repetitive.
Co-Authored-By: Connor Brewster <cbrewster@hey.com>
Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
2023-09-13 14:20:21 +02:00
|
|
|
pub trait DirectoryPutter: Send {
|
2023-03-27 17:08:16 +02:00
|
|
|
/// Put a individual [proto::Directory] into the store.
|
|
|
|
/// Error semantics and behaviour is up to the specific implementation of
|
|
|
|
/// this trait.
|
|
|
|
/// Due to bursting, the returned error might refer to an object previously
|
|
|
|
/// sent via `put`.
|
2023-09-19 11:46:41 -05:00
|
|
|
async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
|
2023-03-27 17:08:16 +02:00
|
|
|
|
|
|
|
/// Close the stream, and wait for any errors.
|
2023-09-19 11:46:41 -05:00
|
|
|
async fn close(&mut self) -> Result<B3Digest, Error>;
|
2023-06-09 12:26:34 +03:00
|
|
|
|
|
|
|
/// Return whether the stream is closed or not.
|
|
|
|
/// Used from some [DirectoryService] implementations only.
|
|
|
|
fn is_closed(&self) -> bool;
|
2023-03-26 13:51:48 +02:00
|
|
|
}
|