refactor(tvix/castore/blobsvc/grpc/wrapper): don't require Arc<_>
Change-Id: I9655f5588c7dc98427de6af47d74b4ab7ce22071 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10516 Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de> Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
96aa220dcf
commit
1b62f82b10
4 changed files with 14 additions and 15 deletions
|
@ -221,7 +221,6 @@ impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
@ -255,8 +254,8 @@ mod tests {
|
||||||
let mut server = tonic::transport::Server::builder();
|
let mut server = tonic::transport::Server::builder();
|
||||||
let router =
|
let router =
|
||||||
server.add_service(crate::proto::blob_service_server::BlobServiceServer::new(
|
server.add_service(crate::proto::blob_service_server::BlobServiceServer::new(
|
||||||
GRPCBlobServiceWrapper::from(
|
GRPCBlobServiceWrapper::new(
|
||||||
Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>
|
Box::<MemoryBlobService>::default() as Box<dyn BlobService>
|
||||||
),
|
),
|
||||||
));
|
));
|
||||||
router.serve_with_incoming(uds_stream).await
|
router.serve_with_incoming(uds_stream).await
|
||||||
|
|
|
@ -6,22 +6,19 @@ use std::{
|
||||||
io,
|
io,
|
||||||
ops::{Deref, DerefMut},
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
|
||||||
};
|
};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tonic::{async_trait, Request, Response, Status, Streaming};
|
use tonic::{async_trait, Request, Response, Status, Streaming};
|
||||||
use tracing::{instrument, warn};
|
use tracing::{instrument, warn};
|
||||||
|
|
||||||
pub struct GRPCBlobServiceWrapper {
|
pub struct GRPCBlobServiceWrapper<T> {
|
||||||
blob_service: Arc<dyn BlobService>,
|
blob_service: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Arc<dyn BlobService>> for GRPCBlobServiceWrapper {
|
impl<T> GRPCBlobServiceWrapper<T> {
|
||||||
fn from(value: Arc<dyn BlobService>) -> Self {
|
pub fn new(blob_service: T) -> Self {
|
||||||
Self {
|
Self { blob_service }
|
||||||
blob_service: value,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,7 +81,10 @@ unsafe impl<const N: usize> bytes::BufMut for BytesMutWithDefaultCapacity<N> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
|
impl<T> super::blob_service_server::BlobService for GRPCBlobServiceWrapper<T>
|
||||||
|
where
|
||||||
|
T: Deref<Target = dyn BlobService> + Send + Sync + 'static,
|
||||||
|
{
|
||||||
// https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
|
// https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
|
||||||
type ReadStream =
|
type ReadStream =
|
||||||
Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
|
Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
//! A crate containing constructors to provide instances of a BlobService and
|
//! A crate containing constructors to provide instances of a BlobService and
|
||||||
//! DirectoryService. Only used for testing purposes, but across crates.
|
//! DirectoryService. Only used for testing purposes, but across crates.
|
||||||
//! Should be removed once we have a better concept of a "Service registry".
|
//! Should be removed once we have a better concept of a "Service registry".
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tonic::transport::{Channel, Endpoint, Server, Uri};
|
use tonic::transport::{Channel, Endpoint, Server, Uri};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -67,7 +67,7 @@ pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> {
|
||||||
tokio::spawn(async {
|
tokio::spawn(async {
|
||||||
// spin up a new DirectoryService
|
// spin up a new DirectoryService
|
||||||
let mut server = Server::builder();
|
let mut server = Server::builder();
|
||||||
let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
|
let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
|
||||||
gen_blob_service(),
|
gen_blob_service(),
|
||||||
)));
|
)));
|
||||||
|
|
||||||
|
|
|
@ -197,7 +197,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
#[allow(unused_mut)]
|
#[allow(unused_mut)]
|
||||||
let mut router = server
|
let mut router = server
|
||||||
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
|
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
|
||||||
blob_service,
|
blob_service,
|
||||||
)))
|
)))
|
||||||
.add_service(DirectoryServiceServer::new(
|
.add_service(DirectoryServiceServer::new(
|
||||||
|
|
Loading…
Add table
Reference in a new issue