refactor(tvix/castore): simplify test_valid_unix_path_ping_pong

We don't need to spawn two tokio runtimes anymore, and can do the URL
parsing at once, too.

Change-Id: I38ab96978cb7f8c31ded2726262e0b1366655094
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9566
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2023-10-08 13:15:30 +02:00 committed by clbot
parent 269ab866f1
commit b196cbbc67

View file

@ -281,7 +281,6 @@ impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::thread;
use tempfile::TempDir;
use tokio::net::UnixListener;
@ -350,77 +349,60 @@ mod tests {
assert!(GRPCBlobService::from_url(&url).is_err());
}
/// This uses the correct scheme for a unix socket, and provides a server on the other side.
/// This is not a tokio::test, because spawn two separate tokio runtimes and
// want to have explicit control.
#[test]
fn test_valid_unix_path_ping_pong() {
/// This ensures connecting via gRPC works as expected.
#[tokio::test]
async fn test_valid_unix_path_ping_pong() {
let tmpdir = TempDir::new().unwrap();
let path = tmpdir.path().join("daemon");
let socket_path = tmpdir.path().join("daemon");
let path_clone = path.clone();
let path_clone = socket_path.clone();
// Spin up a server, in a thread far away, which spawns its own tokio runtime,
// and blocks on the task.
thread::spawn(move || {
// Create the runtime
let rt = tokio::runtime::Runtime::new().unwrap();
// Spin up a server
tokio::spawn(async {
let uds = UnixListener::bind(path_clone).unwrap();
let uds_stream = UnixListenerStream::new(uds);
let task = rt.spawn(async {
let uds = UnixListener::bind(path_clone).unwrap();
let uds_stream = UnixListenerStream::new(uds);
// spin up a new server
let mut server = tonic::transport::Server::builder();
let router =
server.add_service(crate::proto::blob_service_server::BlobServiceServer::new(
GRPCBlobServiceWrapper::from(
Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>
),
));
router.serve_with_incoming(uds_stream).await
});
rt.block_on(task).unwrap().unwrap();
// spin up a new server
let mut server = tonic::transport::Server::builder();
let router =
server.add_service(crate::proto::blob_service_server::BlobServiceServer::new(
GRPCBlobServiceWrapper::from(
Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>
),
));
router.serve_with_incoming(uds_stream).await
});
// Now create another tokio runtime which we'll use in the main test code.
let rt = tokio::runtime::Runtime::new().unwrap();
let task = rt.spawn(async move {
// wait for the socket to be created
{
let mut socket_created = false;
// TODO: exponential backoff urgently
for _try in 1..20 {
if path.exists() {
socket_created = true;
break;
}
tokio::time::sleep(time::Duration::from_millis(20)).await;
// wait for the socket to be created
{
let mut socket_created = false;
// TODO: exponential backoff urgently
for _try in 1..20 {
if socket_path.exists() {
socket_created = true;
break;
}
assert!(
socket_created,
"expected socket path to eventually get created, but never happened"
);
tokio::time::sleep(time::Duration::from_millis(20)).await;
}
// prepare a client
let client = {
let mut url =
url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
url.set_path(path.to_str().unwrap());
GRPCBlobService::from_url(&url).expect("must succeed")
};
assert!(
socket_created,
"expected socket path to eventually get created, but never happened"
);
}
let has = client
.has(&fixtures::BLOB_A_DIGEST)
.await
.expect("must not be err");
// prepare a client
let grpc_client = {
let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display()))
.expect("must parse");
GRPCBlobService::from_url(&url).expect("must succeed")
};
assert!(!has);
});
rt.block_on(task).unwrap()
let has = grpc_client
.has(&fixtures::BLOB_A_DIGEST)
.await
.expect("must not be err");
assert!(!has);
}
}