refactor(tvix/*store): remove some grpc_client let bindings

We had to have these all while the traits where sync, and there was a
lot of spawning and moving.

Most of this can now be removed in favor of some inline `.clone()`.

Change-Id: Id5466c32a403100bc3347866b3172e06a792e311
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9705
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2023-10-12 15:42:01 +02:00 committed by flokli
parent 0b18be3b57
commit 199e5e0339
3 changed files with 25 additions and 27 deletions

View file

@ -112,12 +112,11 @@ impl BlobService for GRPCBlobService {
&self,
digest: &B3Digest,
) -> Result<Option<Box<dyn BlobReader>>, crate::Error> {
// Get a new handle to the gRPC client, and copy the digest.
let mut grpc_client = self.grpc_client.clone();
// Get a stream of [proto::BlobChunk], or return an error if the blob
// doesn't exist.
let resp = grpc_client
let resp = self
.grpc_client
.clone()
.read(proto::ReadBlobRequest {
digest: digest.clone().into(),
})

View file

@ -139,9 +139,11 @@ impl DirectoryService for GRPCDirectoryService {
}
async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
let mut grpc_client = self.grpc_client.clone();
let resp = grpc_client.put(tokio_stream::iter(vec![directory])).await;
let resp = self
.grpc_client
.clone()
.put(tokio_stream::once(directory))
.await;
match resp {
Ok(put_directory_resp) => Ok(put_directory_resp

View file

@ -54,9 +54,10 @@ impl PathInfoService for GRPCPathInfoService {
.connect_with_connector_lazy(tower::service_fn(
move |_: tonic::transport::Uri| UnixStream::connect(path.clone()),
));
let grpc_client =
proto::path_info_service_client::PathInfoServiceClient::new(channel);
Ok(Self::from_client(grpc_client))
Ok(Self::from_client(
proto::path_info_service_client::PathInfoServiceClient::new(channel),
))
} else {
// ensure path is empty, not supported with gRPC.
if !url.path().is_empty() {
@ -78,19 +79,18 @@ impl PathInfoService for GRPCPathInfoService {
.unwrap()
.connect_lazy();
let grpc_client =
proto::path_info_service_client::PathInfoServiceClient::new(channel);
Ok(Self::from_client(grpc_client))
Ok(Self::from_client(
proto::path_info_service_client::PathInfoServiceClient::new(channel),
))
}
}
}
}
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
// Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone();
let path_info = grpc_client
let path_info = self
.grpc_client
.clone()
.get(proto::GetPathInfoRequest {
by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
digest.to_vec().into(),
@ -106,10 +106,9 @@ impl PathInfoService for GRPCPathInfoService {
}
async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
// Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone();
let path_info = grpc_client
let path_info = self
.grpc_client
.clone()
.put(path_info)
.await
.map_err(|e| Error::StorageError(e.to_string()))?
@ -122,13 +121,11 @@ impl PathInfoService for GRPCPathInfoService {
&self,
root_node: &castorepb::node::Node,
) -> Result<(u64, [u8; 32]), Error> {
// Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone();
let root_node = root_node.clone();
let path_info = grpc_client
let path_info = self
.grpc_client
.clone()
.calculate_nar(castorepb::Node {
node: Some(root_node),
node: Some(root_node.clone()),
})
.await
.map_err(|e| Error::StorageError(e.to_string()))?