fix(tvix/castore/grpc): don't use explicit channel

We can just use the `BoxStream` directly, or a `once` with the single
`Directory`.

In the recursive case, we also did not properly close the channel after
the first error.

Change-Id: Ifad56d307fc7861107b6d3cffd28d35631d526e6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11635
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2024-05-12 14:52:09 +03:00 committed by clbot
parent b26569028c
commit 30995a0990

View file

@ -1,12 +1,12 @@
use crate::directoryservice::ClosureValidator; use crate::directoryservice::ClosureValidator;
use crate::proto; use crate::proto;
use crate::{directoryservice::DirectoryService, B3Digest}; use crate::{directoryservice::DirectoryService, B3Digest};
use futures::StreamExt; use futures::stream::BoxStream;
use futures::TryStreamExt;
use std::ops::Deref; use std::ops::Deref;
use tokio::sync::mpsc::channel; use tokio_stream::once;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{async_trait, Request, Response, Status, Streaming}; use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{debug, instrument, warn}; use tracing::{instrument, warn};
pub struct GRPCDirectoryServiceWrapper<T> { pub struct GRPCDirectoryServiceWrapper<T> {
directory_service: T, directory_service: T,
@ -23,64 +23,53 @@ impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServi
where where
T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static, T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static,
{ {
type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>; type GetStream = BoxStream<'static, tonic::Result<proto::Directory, Status>>;
#[instrument(skip_all)] #[instrument(skip_all)]
async fn get( async fn get<'a>(
&self, &'a self,
request: Request<proto::GetDirectoryRequest>, request: Request<proto::GetDirectoryRequest>,
) -> Result<Response<Self::GetStream>, Status> { ) -> Result<Response<Self::GetStream>, Status> {
let (tx, rx) = channel(5);
let req_inner = request.into_inner(); let req_inner = request.into_inner();
// look at the digest in the request and put it in the top of the queue. let by_what = &req_inner
match &req_inner.by_what { .by_what
None => return Err(Status::invalid_argument("by_what needs to be specified")), .ok_or_else(|| Status::invalid_argument("invalid by_what"))?;
Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => {
match by_what {
proto::get_directory_request::ByWhat::Digest(ref digest) => {
let digest: B3Digest = digest let digest: B3Digest = digest
.clone() .clone()
.try_into() .try_into()
.map_err(|_e| Status::invalid_argument("invalid digest length"))?; .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
Ok(tonic::Response::new({
if !req_inner.recursive { if !req_inner.recursive {
let e: Result<proto::Directory, Status> = match self let directory = self
.directory_service .directory_service
.get(&digest) .get(&digest)
.await .await
{ .map_err(|e| {
Ok(Some(directory)) => Ok(directory),
Ok(None) => {
Err(Status::not_found(format!("directory {} not found", digest)))
}
Err(e) => {
warn!(err = %e, directory.digest=%digest, "failed to get directory"); warn!(err = %e, directory.digest=%digest, "failed to get directory");
Err(e.into()) tonic::Status::new(tonic::Code::Internal, e.to_string())
} })?
}; .ok_or_else(|| {
Status::not_found(format!("directory {} not found", digest))
})?;
if tx.send(e).await.is_err() { Box::pin(once(Ok(directory)))
debug!("receiver dropped");
}
} else { } else {
// If recursive was requested, traverse via get_recursive. // If recursive was requested, traverse via get_recursive.
let mut directories_it = self.directory_service.get_recursive(&digest); Box::pin(
self.directory_service.get_recursive(&digest).map_err(|e| {
while let Some(e) = directories_it.next().await { tonic::Status::new(tonic::Code::Internal, e.to_string())
// map err in res from Error to Status }),
let res = e.map_err(|e| Status::internal(e.to_string())); )
if tx.send(res).await.is_err() { }
debug!("receiver dropped"); }))
break;
} }
} }
} }
}
}
let receiver_stream = ReceiverStream::new(rx);
Ok(Response::new(receiver_stream))
}
#[instrument(skip_all)] #[instrument(skip_all)]
async fn put( async fn put(