diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index db9d4a9c0..b0544387b 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -281,7 +281,6 @@ impl tokio::io::AsyncWrite for GRPCBlobWriter< #[cfg(test)] mod tests { use std::sync::Arc; - use std::thread; use tempfile::TempDir; use tokio::net::UnixListener; @@ -350,77 +349,60 @@ mod tests { assert!(GRPCBlobService::from_url(&url).is_err()); } - /// This uses the correct scheme for a unix socket, and provides a server on the other side. - /// This is not a tokio::test, because spawn two separate tokio runtimes and - // want to have explicit control. - #[test] - fn test_valid_unix_path_ping_pong() { + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { let tmpdir = TempDir::new().unwrap(); - let path = tmpdir.path().join("daemon"); + let socket_path = tmpdir.path().join("daemon"); - let path_clone = path.clone(); + let path_clone = socket_path.clone(); - // Spin up a server, in a thread far away, which spawns its own tokio runtime, - // and blocks on the task. - thread::spawn(move || { - // Create the runtime - let rt = tokio::runtime::Runtime::new().unwrap(); + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); - let task = rt.spawn(async { - let uds = UnixListener::bind(path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); - - // spin up a new server - let mut server = tonic::transport::Server::builder(); - let router = - server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( - GRPCBlobServiceWrapper::from( - Arc::new(MemoryBlobService::default()) as Arc - ), - )); - router.serve_with_incoming(uds_stream).await - }); - - rt.block_on(task).unwrap().unwrap(); + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = + server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( + GRPCBlobServiceWrapper::from( + Arc::new(MemoryBlobService::default()) as Arc + ), + )); + router.serve_with_incoming(uds_stream).await }); - // Now create another tokio runtime which we'll use in the main test code. - let rt = tokio::runtime::Runtime::new().unwrap(); - - let task = rt.spawn(async move { - // wait for the socket to be created - { - let mut socket_created = false; - // TODO: exponential backoff urgently - for _try in 1..20 { - if path.exists() { - socket_created = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(20)).await; + // wait for the socket to be created + { + let mut socket_created = false; + // TODO: exponential backoff urgently + for _try in 1..20 { + if socket_path.exists() { + socket_created = true; + break; } - - assert!( - socket_created, - "expected socket path to eventually get created, but never happened" - ); + tokio::time::sleep(time::Duration::from_millis(20)).await; } - // prepare a client - let client = { - let mut url = - url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); - url.set_path(path.to_str().unwrap()); - GRPCBlobService::from_url(&url).expect("must succeed") - }; + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } - let has = client - .has(&fixtures::BLOB_A_DIGEST) - .await - .expect("must not be err"); + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) + .expect("must parse"); + GRPCBlobService::from_url(&url).expect("must succeed") + }; - assert!(!has); - }); - rt.block_on(task).unwrap() + let has = grpc_client + .has(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not be err"); + + assert!(!has); } }