refactor(tvix/castore/directorysvc): factor out gRPC client gen

Move this code into a helper function, which we'll use in other places
in a bit.

Change-Id: Icae6f6dd2d4b2fa86fd2b836ddd7a4ca0e0354e7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9559
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2023-10-07 08:35:31 +02:00 committed by flokli
parent 31f28b6105
commit a77914db73
2 changed files with 180 additions and 175 deletions

View file

@ -338,202 +338,136 @@ impl DirectoryPutter for GRPCPutter {
#[cfg(test)]
mod tests {
use core::time;
use std::thread;
use futures::StreamExt;
use tempfile::TempDir;
use tokio::net::{UnixListener, UnixStream};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::{Endpoint, Server, Uri};
use crate::{
directoryservice::DirectoryService,
fixtures::{DIRECTORY_A, DIRECTORY_B},
proto,
proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper},
utils::gen_directory_service,
utils::gen_directorysvc_grpc_client,
};
#[test]
fn test() {
let tmpdir = TempDir::new().unwrap();
let socket_path = tmpdir.path().join("socket");
#[tokio::test]
async fn test() {
let tempdir = TempDir::new().expect("must succeed");
// create the GrpcDirectoryService
let directory_service = super::GRPCDirectoryService::from_client(
gen_directorysvc_grpc_client(tempdir.path()).await,
);
// Spin up a server, in a thread far away, which spawns its own tokio runtime,
// and blocks on the task.
let socket_path_clone = socket_path.clone();
thread::spawn(move || {
// Create the runtime
let rt = tokio::runtime::Runtime::new().unwrap();
// Get a handle from this runtime
let handle = rt.handle();
// try to get DIRECTORY_A should return Ok(None)
assert_eq!(
None,
directory_service
.get(&DIRECTORY_A.digest())
.await
.expect("must not fail")
);
let task = handle.spawn(async {
let uds = UnixListener::bind(socket_path_clone).unwrap();
let uds_stream = UnixListenerStream::new(uds);
// Now upload it
assert_eq!(
DIRECTORY_A.digest(),
directory_service
.put(DIRECTORY_A.clone())
.await
.expect("must succeed")
);
// spin up a new DirectoryService
let mut server = Server::builder();
let router = server.add_service(DirectoryServiceServer::new(
GRPCDirectoryServiceWrapper::from(gen_directory_service()),
));
router.serve_with_incoming(uds_stream).await
});
// And retrieve it, compare for equality.
assert_eq!(
DIRECTORY_A.clone(),
directory_service
.get(&DIRECTORY_A.digest())
.await
.expect("must succeed")
.expect("must be some")
);
handle.block_on(task)
});
// Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
directory_service
.put(DIRECTORY_B.clone())
.await
.expect_err("must fail");
// set up the local client runtime. This is similar to what the [tokio:test] macro desugars to.
let tester_runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
// wait for the socket to be created
// Putting DIRECTORY_B in a put_multiple will succeed, but the close
// will always fail.
{
let mut socket_created = false;
for _try in 1..20 {
if socket_path.exists() {
socket_created = true;
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
handle.close().await.expect_err("must fail");
}
// Uploading A and then B should succeed, and closing should return the digest of B.
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
let digest = handle.close().await.expect("must succeed");
assert_eq!(DIRECTORY_B.digest(), digest);
// Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
assert_eq!(
DIRECTORY_B.clone(),
directories_it
.next()
.await
.expect("must be some")
.expect("must succeed")
);
assert_eq!(
DIRECTORY_A.clone(),
directories_it
.next()
.await
.expect("must be some")
.expect("must succeed")
);
// Uploading B and then A should fail, because B refers to A, which
// hasn't been uploaded yet.
// However, the client can burst, so we might not have received the
// error back from the server.
{
let mut handle = directory_service.put_multiple_start();
// sending out B will always be fine
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
// whether we will be able to put A as well depends on whether we
// already received the error about B.
if handle.put(DIRECTORY_A.clone()).await.is_ok() {
// If we didn't, and this was Ok(_), …
// a subsequent close MUST fail (because it waits for the
// server)
handle.close().await.expect_err("must fail");
}
}
// Now we do the same test as before, send B, then A, but wait
// sufficiently enough for the server to have s
// to close us the stream,
// and then assert that uploading anything else via the handle will fail.
{
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
let mut is_closed = false;
for _try in 1..1000 {
if handle.is_closed() {
is_closed = true;
break;
}
std::thread::sleep(time::Duration::from_millis(20))
tokio::time::sleep(time::Duration::from_millis(10)).await;
}
assert!(
socket_created,
"expected socket path to eventually get created, but never happened"
);
}
tester_runtime.block_on(async move {
// Create a channel, connecting to the uds at socket_path.
// The URI is unused.
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
UnixStream::connect(socket_path.clone())
}));
let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel);
// create the GrpcDirectoryService, using the tester_runtime.
let directory_service = super::GRPCDirectoryService::from_client(grpc_client);
// try to get DIRECTORY_A should return Ok(None)
assert_eq!(
None,
directory_service
.get(&DIRECTORY_A.digest())
.await
.expect("must not fail")
is_closed,
"expected channel to eventually close, but never happened"
);
// Now upload it
assert_eq!(
DIRECTORY_A.digest(),
directory_service
.put(DIRECTORY_A.clone())
.await
.expect("must succeed")
);
// And retrieve it, compare for equality.
assert_eq!(
DIRECTORY_A.clone(),
directory_service
.get(&DIRECTORY_A.digest())
.await
.expect("must succeed")
.expect("must be some")
);
// Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
directory_service
.put(DIRECTORY_B.clone())
handle
.put(DIRECTORY_A.clone())
.await
.expect_err("must fail");
// Putting DIRECTORY_B in a put_multiple will succeed, but the close
// will always fail.
{
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
handle.close().await.expect_err("must fail");
}
// Uploading A and then B should succeed, and closing should return the digest of B.
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
let digest = handle.close().await.expect("must succeed");
assert_eq!(DIRECTORY_B.digest(), digest);
// Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
assert_eq!(
DIRECTORY_B.clone(),
directories_it
.next()
.await
.expect("must be some")
.expect("must succeed")
);
assert_eq!(
DIRECTORY_A.clone(),
directories_it
.next()
.await
.expect("must be some")
.expect("must succeed")
);
// Uploading B and then A should fail, because B refers to A, which
// hasn't been uploaded yet.
// However, the client can burst, so we might not have received the
// error back from the server.
{
let mut handle = directory_service.put_multiple_start();
// sending out B will always be fine
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
// whether we will be able to put A as well depends on whether we
// already received the error about B.
if handle.put(DIRECTORY_A.clone()).await.is_ok() {
// If we didn't, and this was Ok(_), …
// a subsequent close MUST fail (because it waits for the
// server)
handle.close().await.expect_err("must fail");
}
}
// Now we do the same test as before, send B, then A, but wait
// sufficiently enough for the server to have s
// to close us the stream,
// and then assert that uploading anything else via the handle will fail.
{
let mut handle = directory_service.put_multiple_start();
handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
let mut is_closed = false;
for _try in 1..1000 {
if handle.is_closed() {
is_closed = true;
break;
}
tokio::time::sleep(time::Duration::from_millis(10)).await;
}
assert!(
is_closed,
"expected channel to eventually close, but never happened"
);
handle
.put(DIRECTORY_A.clone())
.await
.expect_err("must fail");
}
});
}
}
}

View file

@ -3,11 +3,20 @@
//! Only used for testing purposes, but across crates.
//! Should be removed once we have a better concept of a "Service registry".
use std::sync::Arc;
use core::time;
use std::{path::Path, sync::Arc, thread};
use tokio::net::{UnixListener, UnixStream};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::{Channel, Endpoint, Server, Uri};
use crate::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
proto::{
directory_service_client::DirectoryServiceClient,
directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper,
},
};
pub fn gen_blob_service() -> Arc<dyn BlobService> {
@ -17,3 +26,65 @@ pub fn gen_blob_service() -> Arc<dyn BlobService> {
pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
Arc::new(MemoryDirectoryService::default())
}
/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there.
/// Once it's listening, it'll start a gRPC client from the original thread, and return it.
/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones.
#[allow(dead_code)]
pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> {
let socket_path = tmpdir.join("socket");
// Spin up a server, in a thread far away, which spawns its own tokio runtime,
// and blocks on the task.
let socket_path_clone = socket_path.clone();
thread::spawn(move || {
// Create the runtime
let rt = tokio::runtime::Runtime::new().unwrap();
// Get a handle from this runtime
let handle = rt.handle();
let task = handle.spawn(async {
let uds = UnixListener::bind(socket_path_clone).unwrap();
let uds_stream = UnixListenerStream::new(uds);
// spin up a new DirectoryService
let mut server = Server::builder();
let router = server.add_service(DirectoryServiceServer::new(
GRPCDirectoryServiceWrapper::from(gen_directory_service()),
));
router.serve_with_incoming(uds_stream).await
});
handle.block_on(task)
});
// wait for the socket to be created
// TODO: pass around FDs instead?
{
let mut socket_created = false;
for _try in 1..20 {
if socket_path.exists() {
socket_created = true;
break;
}
tokio::time::sleep(time::Duration::from_millis(20)).await;
}
assert!(
socket_created,
"expected socket path to eventually get created, but never happened"
);
}
// Create a channel, connecting to the uds at socket_path.
// The URI is unused.
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
UnixStream::connect(socket_path.clone())
}));
let grpc_client = DirectoryServiceClient::new(channel);
grpc_client
}