diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index e8ab854fd..3c76d9e31 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -1,8 +1,11 @@ use std::collections::HashSet; -use super::DirectoryService; +use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; +use crate::Error; use data_encoding::BASE64; +use tokio::sync::mpsc::UnboundedSender; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{transport::Channel, Status}; use tonic::{Code, Streaming}; use tracing::{instrument, warn}; @@ -85,9 +88,6 @@ impl DirectoryService for GRPCDirectoryService { fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> { let mut grpc_client = self.grpc_client.clone(); - // TODO: this currently doesn't work for directories referring to other - // directories, as we're required to upload the whole closure all the - // time. let task = self .tokio_handle .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); @@ -98,7 +98,9 @@ impl DirectoryService for GRPCDirectoryService { .root_digest .as_slice() .try_into() - .unwrap()), // TODO: map error + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), Err(e) => Err(crate::Error::StorageError(e.to_string())), } } @@ -125,6 +127,30 @@ impl DirectoryService for GRPCDirectoryService { StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream) } + + type DirectoryPutter = GRPCPutter; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + let mut grpc_client = self.grpc_client.clone(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let task: tokio::task::JoinHandle> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); + + Ok(s) + }); + + GRPCPutter::new(self.tokio_handle.clone(), tx, task) + } } pub struct StreamIterator { @@ -184,9 +210,10 @@ impl Iterator for StreamIterator { self.received_directory_digests.insert(directory_digest); // register all children in expected_directory_digests. - for child_directories in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + for child_directory in &directory.directories { self.expected_directory_digests - .insert(child_directories.digest.clone().try_into().unwrap()); + .insert(child_directory.digest.clone().try_into().unwrap()); } Some(Ok(directory)) @@ -208,6 +235,88 @@ impl Iterator for StreamIterator { } } +/// Allows uploading multiple Directory messages in the same gRPC stream. +pub struct GRPCPutter { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// Data about the current request - a handle to the task, and the tx part + /// of the channel. + /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. + /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. + #[allow(clippy::type_complexity)] // lol + rq: Option<( + tokio::task::JoinHandle>, + UnboundedSender, + )>, +} + +impl GRPCPutter { + pub fn new( + tokio_handle: tokio::runtime::Handle, + directory_sender: UnboundedSender, + task: tokio::task::JoinHandle>, + ) -> Self { + Self { + tokio_handle, + rq: Some((task, directory_sender)), + } + } + + #[allow(dead_code)] + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } +} + +impl DirectoryPutter for GRPCPutter { + fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + match self.rq { + // If we're not already closed, send the directory to directory_sender. + Some((_, ref directory_sender)) => { + if directory_sender.send(directory).is_err() { + // If the channel has been prematurely closed, invoke close (so we can peek at the error code) + // That error code is much more helpful, because it + // contains the error message from the server. + self.close()?; + } + Ok(()) + } + // If self.close() was already called, we can't put again. + None => Err(Error::StorageError( + "DirectoryPutter already closed".to_string(), + )), + } + } + + /// Closes the stream for sending, and returns the value + fn close(&mut self) -> Result<[u8; 32], crate::Error> { + // get self.rq, and replace it with None. + // This ensures we can only close it once. + match std::mem::take(&mut self.rq) { + None => Err(Error::StorageError("already closed".to_string())), + Some((task, directory_sender)) => { + // close directory_sender, so blocking on task will finish. + drop(directory_sender); + + Ok(self + .tokio_handle + .block_on(task)? + .map_err(|e| Error::StorageError(e.to_string()))? + .root_digest + .try_into() + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?) + } + } + } +} + #[cfg(test)] mod tests { use core::time; @@ -219,10 +328,13 @@ mod tests { use tonic::transport::{Endpoint, Server, Uri}; use crate::{ - directoryservice::DirectoryService, + directoryservice::{DirectoryPutter, DirectoryService}, proto, proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, - tests::{fixtures::DIRECTORY_A, utils::gen_directory_service}, + tests::{ + fixtures::{DIRECTORY_A, DIRECTORY_B}, + utils::gen_directory_service, + }, }; #[test] @@ -260,8 +372,22 @@ mod tests { .build() .unwrap(); - // TODO: wait for the socket to be created - std::thread::sleep(time::Duration::from_millis(200)); + // wait for the socket to be created + { + let mut socket_created = false; + for _try in 1..20 { + if socket_path.exists() { + socket_created = true; + break; + } + std::thread::sleep(time::Duration::from_millis(20)) + } + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } let task = tester_runtime.spawn_blocking(move || { // Create a channel, connecting to the uds at socket_path. @@ -301,8 +427,69 @@ mod tests { .get(&DIRECTORY_A.digest()) .expect("must succeed") .expect("must be some") - ) + ); + + // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. + directory_service + .put(DIRECTORY_B.clone()) + .expect_err("must fail"); + + // Uploading A and then B should succeed, and closing should return the digest of B. + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).expect("must succeed"); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + let digest = handle.close().expect("must succeed"); + assert_eq!(DIRECTORY_B.digest(), digest); + + // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. + let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); + assert_eq!( + DIRECTORY_B.clone(), + directories_it + .next() + .expect("must be some") + .expect("must succeed") + ); + assert_eq!( + DIRECTORY_A.clone(), + directories_it + .next() + .expect("must be some") + .expect("must succeed") + ); + + // Uploading B and then A should fail during close (if we're a + // fast client) + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + handle.put(DIRECTORY_A.clone()).expect("must succeed"); + handle.close().expect_err("must fail"); + + // Below test is a bit timing sensitive. We send B (which refers to + // A, so should fail), and wait sufficiently enough for the server + // to close us the stream, + // and then assert that uploading anything else via the handle will fail. + + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + + let mut is_closed = false; + for _try in 1..20 { + if handle.is_closed() { + is_closed = true; + break; + } + std::thread::sleep(time::Duration::from_millis(200)) + } + + assert!( + is_closed, + "expected channel to eventually close, but never happened" + ); + + handle.put(DIRECTORY_A.clone()).expect_err("must fail"); }); + tester_runtime.block_on(task)?; Ok(()) diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 3d7351033..2b4668a15 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use tracing::{instrument, warn}; +use super::utils::SimplePutter; use super::{DirectoryService, DirectoryTraverser}; #[derive(Clone, Default)] @@ -74,4 +75,14 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { DirectoryTraverser::with(self.clone(), root_directory_digest) } + + type DirectoryPutter = SimplePutter; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + SimplePutter::new(self.clone()) + } } diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index 53f6f08b1..05feb85d4 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -14,6 +14,7 @@ pub use self::utils::DirectoryTraverser; /// digest. pub trait DirectoryService { type DirectoriesIterator: Iterator> + Send; + type DirectoryPutter: DirectoryPutter; /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. @@ -27,4 +28,25 @@ pub trait DirectoryService { /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator; + + /// Allows persisting a closure of [proto::Directory], which is a graph of + /// connected Directory messages. + fn put_multiple_start(&self) -> Self::DirectoryPutter; +} + +/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// +/// The consumer can periodically call [put], starting from the leaves. Once +/// the root is reached, [close] can be called to retrieve the root digest (or +/// an error). +pub trait DirectoryPutter { + /// Put a individual [proto::Directory] into the store. + /// Error semantics and behaviour is up to the specific implementation of + /// this trait. + /// Due to bursting, the returned error might refer to an object previously + /// sent via `put`. + fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + + /// Close the stream, and wait for any errors. + fn close(&mut self) -> Result<[u8; 32], Error>; } diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs index 44d61ae42..d06023230 100644 --- a/tvix/store/src/directoryservice/sled.rs +++ b/tvix/store/src/directoryservice/sled.rs @@ -5,6 +5,7 @@ use prost::Message; use std::path::PathBuf; use tracing::{instrument, warn}; +use super::utils::SimplePutter; use super::{DirectoryService, DirectoryTraverser}; #[derive(Clone)] @@ -97,4 +98,14 @@ impl DirectoryService for SledDirectoryService { fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { DirectoryTraverser::with(self.clone(), root_directory_digest) } + + type DirectoryPutter = SimplePutter; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + SimplePutter::new(self.clone()) + } } diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index 10edb9ba7..7d41e8d3b 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -1,3 +1,4 @@ +use super::DirectoryPutter; use super::DirectoryService; use crate::proto; use crate::Error; @@ -104,3 +105,40 @@ impl Iterator for DirectoryTraverser { } } } + +/// This is a simple implementation of a Directory uploader. +/// TODO: verify connectivity? Factor out these checks into generic helpers? +pub struct SimplePutter { + directory_service: DS, + last_directory_digest: Option<[u8; 32]>, +} + +impl SimplePutter { + pub fn new(directory_service: DS) -> Self { + Self { + directory_service, + last_directory_digest: None, + } + } +} + +impl DirectoryPutter for SimplePutter { + fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + let digest = self.directory_service.put(directory)?; + + // track the last directory digest + self.last_directory_digest = Some(digest); + + Ok(()) + } + + /// We need to be mutable here, as that's the signature of the trait. + fn close(&mut self) -> Result<[u8; 32], Error> { + match self.last_directory_digest { + Some(last_digest) => Ok(last_digest), + None => Err(Error::InvalidRequest( + "no directories sent, can't show root digest".to_string(), + )), + } + } +}