2023-03-26 13:51:48 +02:00
|
|
|
use std::collections::{HashSet, VecDeque};
|
|
|
|
|
|
|
|
use tracing::{debug_span, instrument, warn};
|
|
|
|
|
2023-02-12 14:24:43 +01:00
|
|
|
use crate::{proto, Error};
|
feat(tvix/store/directorysvc): add gRPC client
This provides a GRPCDirectoryService struct implementing
DirectoryService, allowing a client to Directory objects from a (remote)
tvix-store.
Remote in this case is anything outside the current process, be it
another process, or an endpoint on the network.
To keep the sync interface in the `DirectoryService` trait, a handle to
some tokio runtime needs to be passed into the constructor, and the two
methods use `self.tokio_handle.spawn` to start an async function, and
`self.tokio_handle.block_on` to wait for its completion.
The client handle, called `grpc_client` itself is easy to clone, and
treats concurrent requests internally. This means, even though we keep
the `DirectoryService` trait sync, there's nothing preventing it from
being used concurrently, let's say from multiple threads.
There's still two limitations for now:
1) The trait doesn't make use of the `recursive` request, which
currently leads to a N+1 query problem. This can be fixed
by `GRPCDirectoryService` having a reference to another
`DirectoryService` acting as the local side.
I want to wait for general store composition code to pop up before
manually coding this here.
2) It's currently only possible to put() leaf directory nodes, as the
request normally requires uploading a whole closure. We might want
to add another batch function to upload a whole closure, and/or do
this batching in certain cases. This still needs some more thinking.
Change-Id: I7ffec791610b72c0960cf5307cefbb12ec946dc9
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8336
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
2023-03-23 13:49:57 +01:00
|
|
|
mod grpc;
|
2023-02-12 14:24:43 +01:00
|
|
|
mod memory;
|
|
|
|
mod sled;
|
|
|
|
|
feat(tvix/store/directorysvc): add gRPC client
This provides a GRPCDirectoryService struct implementing
DirectoryService, allowing a client to Directory objects from a (remote)
tvix-store.
Remote in this case is anything outside the current process, be it
another process, or an endpoint on the network.
To keep the sync interface in the `DirectoryService` trait, a handle to
some tokio runtime needs to be passed into the constructor, and the two
methods use `self.tokio_handle.spawn` to start an async function, and
`self.tokio_handle.block_on` to wait for its completion.
The client handle, called `grpc_client` itself is easy to clone, and
treats concurrent requests internally. This means, even though we keep
the `DirectoryService` trait sync, there's nothing preventing it from
being used concurrently, let's say from multiple threads.
There's still two limitations for now:
1) The trait doesn't make use of the `recursive` request, which
currently leads to a N+1 query problem. This can be fixed
by `GRPCDirectoryService` having a reference to another
`DirectoryService` acting as the local side.
I want to wait for general store composition code to pop up before
manually coding this here.
2) It's currently only possible to put() leaf directory nodes, as the
request normally requires uploading a whole closure. We might want
to add another batch function to upload a whole closure, and/or do
this batching in certain cases. This still needs some more thinking.
Change-Id: I7ffec791610b72c0960cf5307cefbb12ec946dc9
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8336
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
2023-03-23 13:49:57 +01:00
|
|
|
pub use self::grpc::GRPCDirectoryService;
|
2023-02-12 14:24:43 +01:00
|
|
|
pub use self::memory::MemoryDirectoryService;
|
|
|
|
pub use self::sled::SledDirectoryService;
|
|
|
|
|
|
|
|
/// The base trait all Directory services need to implement.
|
|
|
|
/// This is a simple get and put of [crate::proto::Directory], returning their
|
|
|
|
/// digest.
|
|
|
|
pub trait DirectoryService {
|
2023-03-26 13:51:48 +02:00
|
|
|
type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send;
|
|
|
|
|
2023-02-12 14:24:43 +01:00
|
|
|
/// Get looks up a single Directory message by its digest.
|
|
|
|
/// In case the directory is not found, Ok(None) is returned.
|
2023-03-16 00:01:30 +01:00
|
|
|
fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>;
|
2023-02-12 14:24:43 +01:00
|
|
|
/// Get uploads a single Directory message, and returns the calculated
|
|
|
|
/// digest, or an error.
|
2023-03-16 00:01:30 +01:00
|
|
|
fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>;
|
2023-03-26 13:51:48 +02:00
|
|
|
|
|
|
|
/// Looks up a closure of [proto::Directory].
|
|
|
|
/// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`,
|
|
|
|
/// and we'd be able to add a default implementation for it here, but
|
|
|
|
/// we can't have that yet.
|
|
|
|
fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Traverses a [proto::Directory] from the root to the children.
|
|
|
|
///
|
|
|
|
/// This is mostly BFS, but directories are only returned once.
|
|
|
|
pub struct DirectoryTraverser<DS: DirectoryService> {
|
|
|
|
directory_service: DS,
|
|
|
|
/// The list of all directories that still need to be traversed. The next
|
|
|
|
/// element is picked from the front, new elements are enqueued at the
|
|
|
|
/// back.
|
|
|
|
worklist_directory_digests: VecDeque<[u8; 32]>,
|
|
|
|
/// The list of directory digests already sent to the consumer.
|
|
|
|
/// We omit sending the same directories multiple times.
|
|
|
|
sent_directory_digests: HashSet<[u8; 32]>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<DS: DirectoryService> DirectoryTraverser<DS> {
|
|
|
|
pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self {
|
|
|
|
Self {
|
|
|
|
directory_service,
|
|
|
|
worklist_directory_digests: VecDeque::from([*root_directory_digest]),
|
|
|
|
sent_directory_digests: HashSet::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// enqueue all child directory digests to the work queue, as
|
|
|
|
// long as they're not part of the worklist or already sent.
|
|
|
|
// This panics if the digest looks invalid, it's supposed to be checked first.
|
|
|
|
fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
|
|
|
|
for child_directory_node in &directory.directories {
|
|
|
|
let child_digest: [u8; 32] = child_directory_node
|
|
|
|
.digest
|
|
|
|
.as_slice()
|
|
|
|
.try_into()
|
|
|
|
.map_err(|_e| Error::StorageError("invalid digest length".to_string()))
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
if self.worklist_directory_digests.contains(&child_digest)
|
|
|
|
|| self.sent_directory_digests.contains(&child_digest)
|
|
|
|
{
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
self.worklist_directory_digests.push_back(child_digest);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
|
|
|
|
type Item = Result<proto::Directory, Error>;
|
|
|
|
|
|
|
|
#[instrument(skip_all)]
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
// fetch the next directory digest from the top of the work queue.
|
|
|
|
match self.worklist_directory_digests.pop_front() {
|
|
|
|
None => None,
|
|
|
|
Some(current_directory_digest) => {
|
|
|
|
let current_directory_b64 = data_encoding::BASE64.encode(¤t_directory_digest);
|
|
|
|
let span = debug_span!("directory.digest", current_directory_b64);
|
|
|
|
let _ = span.enter();
|
|
|
|
|
|
|
|
// look up the directory itself.
|
|
|
|
let current_directory = match self.directory_service.get(¤t_directory_digest)
|
|
|
|
{
|
|
|
|
// if we got it
|
|
|
|
Ok(Some(current_directory)) => {
|
|
|
|
// validate, we don't want to send invalid directories.
|
|
|
|
if let Err(e) = current_directory.validate() {
|
|
|
|
warn!("directory failed validation: {}", e.to_string());
|
|
|
|
return Some(Err(Error::StorageError(format!(
|
|
|
|
"invalid directory: {}",
|
|
|
|
current_directory_b64
|
|
|
|
))));
|
|
|
|
}
|
|
|
|
current_directory
|
|
|
|
}
|
|
|
|
// if it's not there, we have an inconsistent store!
|
|
|
|
Ok(None) => {
|
|
|
|
warn!("directory {} does not exist", current_directory_b64);
|
|
|
|
return Some(Err(Error::StorageError(format!(
|
|
|
|
"directory {} does not exist",
|
|
|
|
current_directory_b64
|
|
|
|
))));
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
warn!("failed to look up directory");
|
|
|
|
return Some(Err(Error::StorageError(format!(
|
|
|
|
"unable to look up directory {}: {}",
|
|
|
|
current_directory_b64, e
|
|
|
|
))));
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// All DirectoryServices MUST validate directory nodes, before returning them out, so we
|
|
|
|
// can be sure [enqueue_child_directories] doesn't panic.
|
|
|
|
|
|
|
|
// enqueue child directories
|
|
|
|
self.enqueue_child_directories(¤t_directory);
|
|
|
|
Some(Ok(current_directory))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-02-12 14:24:43 +01:00
|
|
|
}
|