refactor(tvix/castore): move tests to grpc client, rm tonic-mock

Similar to gen_directorysvc_grpc_client, introduce a
gen_blobsvc_grpc_client function that provides a gRPC client connected
to a blobservice.

The test is update to use that client to test against, rather than the
server trait, removing the last usage of tonic_mock, so it's removed
as well.

Fixes b/243.

Change-Id: If746e8600588da247eb53a63b70fe72f139e9e77
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9564
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2023-10-08 11:31:45 +02:00 committed by clbot
parent e778a33710
commit c847cc32d9
8 changed files with 63 additions and 97 deletions

View file

@ -34,7 +34,6 @@ tonic-build = "0.8.2"
[dev-dependencies]
test-case = "2.2.2"
tempfile = "3.3.0"
tonic-mock = { git = "https://github.com/brainrake/tonic-mock", branch = "bump-dependencies" }
[features]
default = []

View file

@ -1,23 +1,17 @@
use crate::fixtures::{BLOB_A, BLOB_A_DIGEST};
use crate::proto::blob_service_server::BlobService as GRPCBlobService;
use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest};
use crate::utils::gen_blob_service;
use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest};
use crate::utils::gen_blobsvc_grpc_client;
use tokio_stream::StreamExt;
fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper {
let blob_service = gen_blob_service();
GRPCBlobServiceWrapper::from(blob_service)
}
/// Trying to read a non-existent blob should return a not found error.
#[tokio::test]
async fn not_found_read() {
let service = gen_grpc_blob_service();
let mut grpc_client = gen_blobsvc_grpc_client().await;
let resp = service
.read(tonic::Request::new(ReadBlobRequest {
let resp = grpc_client
.read(ReadBlobRequest {
digest: BLOB_A_DIGEST.clone().into(),
}))
})
.await;
// We can't use unwrap_err here, because the Ok value doesn't implement
@ -32,13 +26,13 @@ async fn not_found_read() {
/// Trying to stat a non-existent blob should return a not found error.
#[tokio::test]
async fn not_found_stat() {
let service = gen_grpc_blob_service();
let mut grpc_client = gen_blobsvc_grpc_client().await;
let resp = service
.stat(tonic::Request::new(StatBlobRequest {
let resp = grpc_client
.stat(StatBlobRequest {
digest: BLOB_A_DIGEST.clone().into(),
..Default::default()
}))
})
.await
.expect_err("must fail");
@ -49,13 +43,13 @@ async fn not_found_stat() {
/// Put a blob in the store, get it back.
#[tokio::test]
async fn put_read_stat() {
let service = gen_grpc_blob_service();
let mut grpc_client = gen_blobsvc_grpc_client().await;
// Send blob A.
let put_resp = service
.put(tonic_mock::streaming_request(vec![BlobChunk {
let put_resp = grpc_client
.put(tokio_stream::once(BlobChunk {
data: BLOB_A.clone(),
}]))
}))
.await
.expect("must succeed")
.into_inner();
@ -65,20 +59,20 @@ async fn put_read_stat() {
// Stat for the digest of A.
// We currently don't ask for more granular chunking data, as we don't
// expose it yet.
let _resp = service
.stat(tonic::Request::new(StatBlobRequest {
let _resp = grpc_client
.stat(StatBlobRequest {
digest: BLOB_A_DIGEST.clone().into(),
..Default::default()
}))
})
.await
.expect("must succeed")
.into_inner();
// Read the blob. It should return the same data.
let resp = service
.read(tonic::Request::new(ReadBlobRequest {
let resp = grpc_client
.read(ReadBlobRequest {
digest: BLOB_A_DIGEST.clone().into(),
}))
})
.await;
let mut rx = resp.ok().unwrap().into_inner();

View file

@ -16,9 +16,7 @@ async fn get_directories(
grpc_client: &mut DirectoryServiceClient<Channel>,
get_directory_request: GetDirectoryRequest,
) -> Result<Vec<Directory>, Status> {
let resp = grpc_client
.get(tonic::Request::new(get_directory_request))
.await;
let resp = grpc_client.get(get_directory_request).await;
// if the response is an error itself, return the error, otherwise unpack
let stream = match resp {
@ -39,10 +37,10 @@ async fn not_found() {
let mut grpc_client = gen_directorysvc_grpc_client().await;
let resp = grpc_client
.get(tonic::Request::new(GetDirectoryRequest {
.get(GetDirectoryRequest {
by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
..Default::default()
}))
})
.await;
let stream = resp.expect("must succeed").into_inner();

View file

@ -12,8 +12,10 @@ use crate::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
proto::{
blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer,
directory_service_client::DirectoryServiceClient,
directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper,
directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper,
GRPCDirectoryServiceWrapper,
},
};
@ -34,9 +36,8 @@ pin_project! {
}
}
/// This will spawn the a gRPC server with a DirectoryService client, and
/// connect a gRPC DirectoryService client.
/// The client is returned.
/// This will spawn the a gRPC server with a DirectoryService client, connect a
/// gRPC DirectoryService client and return it.
#[allow(dead_code)]
pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
let (left, right) = tokio::io::duplex(64);
@ -69,3 +70,38 @@ pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Cha
grpc_client
}
/// This will spawn the a gRPC server with a BlobService client, connect a
/// gRPC BlobService client and return it.
#[allow(dead_code)]
pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> {
let (left, right) = tokio::io::duplex(64);
// spin up a server, which will only connect once, to the left side.
tokio::spawn(async {
// spin up a new DirectoryService
let mut server = Server::builder();
let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
gen_blob_service(),
)));
router
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
.await
});
// Create a client, connecting to the right side. The URI is unused.
let mut maybe_right = Some(right);
let grpc_client = BlobServiceClient::new(
Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(tower::service_fn(move |_: Uri| {
let right = maybe_right.take().unwrap();
async move { Ok::<_, std::io::Error>(right) }
}))
.await
.unwrap(),
);
grpc_client
}