refactor(tvix/castore/directorysvc/grpc/wrapper): no Arc<_>
We can also drop the Clone requirement. Because the trait is async since some time, there's no need to clone before moving into an async closure, allowing us to simplify the code a bit. Change-Id: I9b0a0e10077d8c548d218207b908bfd92c5b8de0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10515 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz> Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
parent
54fe97e725
commit
96aa220dcf
4 changed files with 47 additions and 53 deletions
|
@ -288,7 +288,7 @@ impl DirectoryPutter for GRPCPutter {
|
||||||
mod tests {
|
mod tests {
|
||||||
use core::time;
|
use core::time;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use std::{any::Any, sync::Arc, time::Duration};
|
use std::{any::Any, time::Duration};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use tokio::net::UnixListener;
|
use tokio::net::UnixListener;
|
||||||
use tokio_retry::{strategy::ExponentialBackoff, Retry};
|
use tokio_retry::{strategy::ExponentialBackoff, Retry};
|
||||||
|
@ -460,8 +460,8 @@ mod tests {
|
||||||
let mut server = tonic::transport::Server::builder();
|
let mut server = tonic::transport::Server::builder();
|
||||||
let router = server.add_service(
|
let router = server.add_service(
|
||||||
crate::proto::directory_service_server::DirectoryServiceServer::new(
|
crate::proto::directory_service_server::DirectoryServiceServer::new(
|
||||||
GRPCDirectoryServiceWrapper::from(
|
GRPCDirectoryServiceWrapper::new(
|
||||||
Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>
|
Box::<MemoryDirectoryService>::default() as Box<dyn DirectoryService>
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
|
@ -2,26 +2,27 @@ use crate::proto;
|
||||||
use crate::{directoryservice::DirectoryService, B3Digest};
|
use crate::{directoryservice::DirectoryService, B3Digest};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::ops::Deref;
|
||||||
use tokio::{sync::mpsc::channel, task};
|
use tokio::sync::mpsc::channel;
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tonic::{async_trait, Request, Response, Status, Streaming};
|
use tonic::{async_trait, Request, Response, Status, Streaming};
|
||||||
use tracing::{debug, instrument, warn};
|
use tracing::{debug, instrument, warn};
|
||||||
|
|
||||||
pub struct GRPCDirectoryServiceWrapper {
|
pub struct GRPCDirectoryServiceWrapper<T> {
|
||||||
directory_service: Arc<dyn DirectoryService>,
|
directory_service: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Arc<dyn DirectoryService>> for GRPCDirectoryServiceWrapper {
|
impl<T> GRPCDirectoryServiceWrapper<T> {
|
||||||
fn from(value: Arc<dyn DirectoryService>) -> Self {
|
pub fn new(directory_service: T) -> Self {
|
||||||
Self {
|
Self { directory_service }
|
||||||
directory_service: value,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper {
|
impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T>
|
||||||
|
where
|
||||||
|
T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static,
|
||||||
|
{
|
||||||
type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>;
|
type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>;
|
||||||
|
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
|
@ -33,50 +34,43 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
|
||||||
|
|
||||||
let req_inner = request.into_inner();
|
let req_inner = request.into_inner();
|
||||||
|
|
||||||
let directory_service = self.directory_service.clone();
|
// look at the digest in the request and put it in the top of the queue.
|
||||||
|
match &req_inner.by_what {
|
||||||
|
None => return Err(Status::invalid_argument("by_what needs to be specified")),
|
||||||
|
Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => {
|
||||||
|
let digest: B3Digest = digest
|
||||||
|
.clone()
|
||||||
|
.try_into()
|
||||||
|
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
||||||
|
|
||||||
let _task = {
|
if !req_inner.recursive {
|
||||||
// look at the digest in the request and put it in the top of the queue.
|
let e: Result<proto::Directory, Status> =
|
||||||
match &req_inner.by_what {
|
match self.directory_service.get(&digest).await {
|
||||||
None => return Err(Status::invalid_argument("by_what needs to be specified")),
|
Ok(Some(directory)) => Ok(directory),
|
||||||
Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => {
|
Ok(None) => {
|
||||||
let digest: B3Digest = digest
|
Err(Status::not_found(format!("directory {} not found", digest)))
|
||||||
.clone()
|
|
||||||
.try_into()
|
|
||||||
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
|
|
||||||
|
|
||||||
task::spawn(async move {
|
|
||||||
if !req_inner.recursive {
|
|
||||||
let e: Result<proto::Directory, Status> =
|
|
||||||
match directory_service.get(&digest).await {
|
|
||||||
Ok(Some(directory)) => Ok(directory),
|
|
||||||
Ok(None) => Err(Status::not_found(format!(
|
|
||||||
"directory {} not found",
|
|
||||||
digest
|
|
||||||
))),
|
|
||||||
Err(e) => Err(e.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
if tx.send(e).await.is_err() {
|
|
||||||
debug!("receiver dropped");
|
|
||||||
}
|
}
|
||||||
} else {
|
Err(e) => Err(e.into()),
|
||||||
// If recursive was requested, traverse via get_recursive.
|
};
|
||||||
let mut directories_it = directory_service.get_recursive(&digest);
|
|
||||||
|
|
||||||
while let Some(e) = directories_it.next().await {
|
if tx.send(e).await.is_err() {
|
||||||
// map err in res from Error to Status
|
debug!("receiver dropped");
|
||||||
let res = e.map_err(|e| Status::internal(e.to_string()));
|
}
|
||||||
if tx.send(res).await.is_err() {
|
} else {
|
||||||
debug!("receiver dropped");
|
// If recursive was requested, traverse via get_recursive.
|
||||||
break;
|
let mut directories_it = self.directory_service.get_recursive(&digest);
|
||||||
}
|
|
||||||
}
|
while let Some(e) = directories_it.next().await {
|
||||||
|
// map err in res from Error to Status
|
||||||
|
let res = e.map_err(|e| Status::internal(e.to_string()));
|
||||||
|
if tx.send(res).await.is_err() {
|
||||||
|
debug!("receiver dropped");
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
let receiver_stream = ReceiverStream::new(rx);
|
let receiver_stream = ReceiverStream::new(rx);
|
||||||
Ok(Response::new(receiver_stream))
|
Ok(Response::new(receiver_stream))
|
||||||
|
|
|
@ -35,7 +35,7 @@ pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Cha
|
||||||
// spin up a new DirectoryService
|
// spin up a new DirectoryService
|
||||||
let mut server = Server::builder();
|
let mut server = Server::builder();
|
||||||
let router = server.add_service(DirectoryServiceServer::new(
|
let router = server.add_service(DirectoryServiceServer::new(
|
||||||
GRPCDirectoryServiceWrapper::from(gen_directory_service()),
|
GRPCDirectoryServiceWrapper::new(gen_directory_service()),
|
||||||
));
|
));
|
||||||
|
|
||||||
router
|
router
|
||||||
|
|
|
@ -201,7 +201,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
blob_service,
|
blob_service,
|
||||||
)))
|
)))
|
||||||
.add_service(DirectoryServiceServer::new(
|
.add_service(DirectoryServiceServer::new(
|
||||||
GRPCDirectoryServiceWrapper::from(directory_service),
|
GRPCDirectoryServiceWrapper::new(directory_service),
|
||||||
))
|
))
|
||||||
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
|
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
|
||||||
Arc::from(path_info_service),
|
Arc::from(path_info_service),
|
||||||
|
|
Loading…
Add table
Reference in a new issue