From 1b62f82b10d82f1fb503daf52184ce5f72d0288f Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Mon, 1 Jan 2024 03:23:23 +0200 Subject: [PATCH] refactor(tvix/castore/blobsvc/grpc/wrapper): don't require Arc<_> Change-Id: I9655f5588c7dc98427de6af47d74b4ab7ce22071 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10516 Tested-by: BuildkiteCI Autosubmit: flokli Reviewed-by: raitobezarius --- tvix/castore/src/blobservice/grpc.rs | 5 ++--- .../src/proto/grpc_blobservice_wrapper.rs | 18 +++++++++--------- tvix/castore/src/utils.rs | 4 ++-- tvix/store/src/bin/tvix-store.rs | 2 +- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index f90240d88..faf50eff8 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -221,7 +221,6 @@ impl tokio::io::AsyncWrite for GRPCBlobWriter< #[cfg(test)] mod tests { - use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; @@ -255,8 +254,8 @@ mod tests { let mut server = tonic::transport::Server::builder(); let router = server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( - GRPCBlobServiceWrapper::from( - Arc::new(MemoryBlobService::default()) as Arc + GRPCBlobServiceWrapper::new( + Box::::default() as Box ), )); router.serve_with_incoming(uds_stream).await diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs index a37cc299b..063f0421d 100644 --- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -6,22 +6,19 @@ use std::{ io, ops::{Deref, DerefMut}, pin::Pin, - sync::Arc, }; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{instrument, warn}; -pub struct GRPCBlobServiceWrapper { - blob_service: Arc, +pub struct GRPCBlobServiceWrapper { + blob_service: T, } -impl From> for GRPCBlobServiceWrapper { - fn from(value: Arc) -> Self { - Self { - blob_service: value, - } +impl GRPCBlobServiceWrapper { + pub fn new(blob_service: T) -> Self { + Self { blob_service } } } @@ -84,7 +81,10 @@ unsafe impl bytes::BufMut for BytesMutWithDefaultCapacity { } #[async_trait] -impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { +impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper +where + T: Deref + Send + Sync + 'static, +{ // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 type ReadStream = Pin> + Send + 'static>>; diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs index b24627ed9..d9a7254a5 100644 --- a/tvix/castore/src/utils.rs +++ b/tvix/castore/src/utils.rs @@ -1,8 +1,8 @@ //! A crate containing constructors to provide instances of a BlobService and //! DirectoryService. Only used for testing purposes, but across crates. //! Should be removed once we have a better concept of a "Service registry". - use std::sync::Arc; + use tonic::transport::{Channel, Endpoint, Server, Uri}; use crate::{ @@ -67,7 +67,7 @@ pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient { tokio::spawn(async { // spin up a new DirectoryService let mut server = Server::builder(); - let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( + let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( gen_blob_service(), ))); diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 59c39f7b4..a245af78d 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -197,7 +197,7 @@ async fn main() -> Result<(), Box> { #[allow(unused_mut)] let mut router = server - .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( blob_service, ))) .add_service(DirectoryServiceServer::new(