refactor(tvix/store): remove Arc<> from PathInfoService::from_addr

This makes PathInfoService::from_addr return a Box<dyn PathInfoService>,
rather than an Arc<dyn …>, and leaves it up to the consumers to rewrap
it into an Arc where needed.

This allows us to drop the Arc for the tvix-store daemon subcommand.

Change-Id: Ic83aa2ade6c51912281bd17c7eef7252e152b2d1
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10409
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: sterni <sternenseemann@systemli.org>
This commit is contained in:
Florian Klink 2023-12-17 01:32:38 +02:00 committed by flokli
parent 93a228b9a4
commit 52cad86195
5 changed files with 37 additions and 24 deletions

View file

@ -6,6 +6,7 @@ use nix_compat::store_path::StorePath;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_listener::Listener; use tokio_listener::Listener;
use tokio_listener::SystemOptions; use tokio_listener::SystemOptions;
@ -21,6 +22,7 @@ use tvix_castore::proto::GRPCBlobServiceWrapper;
use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper;
use tvix_castore::proto::NamedNode; use tvix_castore::proto::NamedNode;
use tvix_store::pathinfoservice; use tvix_store::pathinfoservice;
use tvix_store::pathinfoservice::PathInfoService;
use tvix_store::proto::nar_info; use tvix_store::proto::nar_info;
use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
use tvix_store::proto::GRPCPathInfoServiceWrapper; use tvix_store::proto::GRPCPathInfoServiceWrapper;
@ -217,9 +219,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.add_service(DirectoryServiceServer::new( .add_service(DirectoryServiceServer::new(
GRPCDirectoryServiceWrapper::from(directory_service), GRPCDirectoryServiceWrapper::from(directory_service),
)) ))
.add_service(PathInfoServiceServer::new( .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
GRPCPathInfoServiceWrapper::from(path_info_service), path_info_service,
)); )));
#[cfg(feature = "tonic-reflection")] #[cfg(feature = "tonic-reflection")]
{ {
@ -257,6 +259,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
) )
.await?; .await?;
// Arc the PathInfoService, as we clone it .
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let tasks = paths let tasks = paths
.into_iter() .into_iter()
.map(|path| { .map(|path| {
@ -356,6 +361,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
) )
.await?; .await?;
// Arc the PathInfoService, as TvixStoreFS requires Clone
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let mut fuse_daemon = tokio::task::spawn_blocking(move || { let mut fuse_daemon = tokio::task::spawn_blocking(move || {
let f = TvixStoreFs::new( let f = TvixStoreFs::new(
blob_service, blob_service,
@ -397,6 +405,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
) )
.await?; .await?;
// Arc the PathInfoService, as TvixStoreFS requires Clone
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let fs = TvixStoreFs::new( let fs = TvixStoreFs::new(
blob_service, blob_service,

View file

@ -31,7 +31,7 @@ pub async fn from_addr(
uri: &str, uri: &str,
blob_service: Arc<dyn BlobService>, blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>, directory_service: Arc<dyn DirectoryService>,
) -> Result<Arc<dyn PathInfoService>, Error> { ) -> Result<Box<dyn PathInfoService>, Error> {
let url = let url =
Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
@ -40,7 +40,7 @@ pub async fn from_addr(
if url.has_host() || !url.path().is_empty() { if url.has_host() || !url.path().is_empty() {
return Err(Error::StorageError("invalid url".to_string())); return Err(Error::StorageError("invalid url".to_string()));
} }
Arc::new(MemoryPathInfoService::new(blob_service, directory_service)) Box::new(MemoryPathInfoService::new(blob_service, directory_service))
} else if url.scheme() == "sled" { } else if url.scheme() == "sled" {
// sled doesn't support host, and a path can be provided (otherwise // sled doesn't support host, and a path can be provided (otherwise
// it'll live in memory only). // it'll live in memory only).
@ -57,12 +57,12 @@ pub async fn from_addr(
// TODO: expose other parameters as URL parameters? // TODO: expose other parameters as URL parameters?
if url.path().is_empty() { if url.path().is_empty() {
return Ok(Arc::new( return Ok(Box::new(
SledPathInfoService::new_temporary(blob_service, directory_service) SledPathInfoService::new_temporary(blob_service, directory_service)
.map_err(|e| Error::StorageError(e.to_string()))?, .map_err(|e| Error::StorageError(e.to_string()))?,
)); ));
} }
return Ok(Arc::new( return Ok(Box::new(
SledPathInfoService::new(url.path(), blob_service, directory_service) SledPathInfoService::new(url.path(), blob_service, directory_service)
.map_err(|e| Error::StorageError(e.to_string()))?, .map_err(|e| Error::StorageError(e.to_string()))?,
)); ));
@ -92,7 +92,7 @@ pub async fn from_addr(
} }
} }
Arc::new(nix_http_path_info_service) Box::new(nix_http_path_info_service)
} else if url.scheme().starts_with("grpc+") { } else if url.scheme().starts_with("grpc+") {
// schemes starting with grpc+ go to the GRPCPathInfoService. // schemes starting with grpc+ go to the GRPCPathInfoService.
// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
@ -100,7 +100,7 @@ pub async fn from_addr(
// - In the case of non-unix sockets, there must be a host, but no path. // - In the case of non-unix sockets, there must be a host, but no path.
// Constructing the channel is handled by tvix_castore::channel::from_url. // Constructing the channel is handled by tvix_castore::channel::from_url.
let client = PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); let client = PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
Arc::new(GRPCPathInfoService::from_client(client)) Box::new(GRPCPathInfoService::from_client(client))
} else { } else {
Err(Error::StorageError(format!( Err(Error::StorageError(format!(
"unknown scheme: {}", "unknown scheme: {}",

View file

@ -115,7 +115,6 @@ impl PathInfoService for GRPCPathInfoService {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tempfile::TempDir; use tempfile::TempDir;
@ -151,11 +150,11 @@ mod tests {
let mut server = tonic::transport::Server::builder(); let mut server = tonic::transport::Server::builder();
let router = server.add_service( let router = server.add_service(
crate::proto::path_info_service_server::PathInfoServiceServer::new( crate::proto::path_info_service_server::PathInfoServiceServer::new(
GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new( GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new(
gen_blob_service(), gen_blob_service(),
gen_directory_service(), gen_directory_service(),
)) ))
as Arc<dyn PathInfoService>), as Box<dyn PathInfoService>),
), ),
); );
router.serve_with_incoming(uds_stream).await router.serve_with_incoming(uds_stream).await

View file

@ -2,28 +2,31 @@ use crate::nar::RenderError;
use crate::pathinfoservice::PathInfoService; use crate::pathinfoservice::PathInfoService;
use crate::proto; use crate::proto;
use futures::StreamExt; use futures::StreamExt;
use std::sync::Arc; use std::ops::Deref;
use tokio::task; use tokio::task;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::{async_trait, Request, Response, Result, Status}; use tonic::{async_trait, Request, Response, Result, Status};
use tracing::{debug, instrument, warn}; use tracing::{debug, instrument, warn};
use tvix_castore::proto as castorepb; use tvix_castore::proto as castorepb;
pub struct GRPCPathInfoServiceWrapper { pub struct GRPCPathInfoServiceWrapper<PS> {
path_info_service: Arc<dyn PathInfoService>, inner: PS,
// FUTUREWORK: allow exposing without allowing listing // FUTUREWORK: allow exposing without allowing listing
} }
impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper { impl<PS> GRPCPathInfoServiceWrapper<PS> {
fn from(value: Arc<dyn PathInfoService>) -> Self { pub fn new(path_info_service: PS) -> Self {
Self { Self {
path_info_service: value, inner: path_info_service,
} }
} }
} }
#[async_trait] #[async_trait]
impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper { impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS>
where
PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
{
type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>; type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
#[instrument(skip(self))] #[instrument(skip(self))]
@ -38,7 +41,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
.to_vec() .to_vec()
.try_into() .try_into()
.map_err(|_e| Status::invalid_argument("invalid output digest length"))?; .map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
match self.path_info_service.get(digest).await { match self.inner.get(digest).await {
Ok(None) => Err(Status::not_found("PathInfo not found")), Ok(None) => Err(Status::not_found("PathInfo not found")),
Ok(Some(path_info)) => Ok(Response::new(path_info)), Ok(Some(path_info)) => Ok(Response::new(path_info)),
Err(e) => { Err(e) => {
@ -56,7 +59,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
// Store the PathInfo in the client. Clients MUST validate the data // Store the PathInfo in the client. Clients MUST validate the data
// they receive, so we don't validate additionally here. // they receive, so we don't validate additionally here.
match self.path_info_service.put(path_info).await { match self.inner.put(path_info).await {
Ok(path_info_new) => Ok(Response::new(path_info_new)), Ok(path_info_new) => Ok(Response::new(path_info_new)),
Err(e) => { Err(e) => {
warn!("failed to insert PathInfo: {}", e); warn!("failed to insert PathInfo: {}", e);
@ -74,7 +77,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
None => Err(Status::invalid_argument("no root node sent")), None => Err(Status::invalid_argument("no root node sent")),
Some(root_node) => { Some(root_node) => {
let (nar_size, nar_sha256) = self let (nar_size, nar_sha256) = self
.path_info_service .inner
.calculate_nar(&root_node) .calculate_nar(&root_node)
.await .await
.expect("error during nar calculation"); // TODO: handle error .expect("error during nar calculation"); // TODO: handle error
@ -94,7 +97,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
) -> Result<Response<Self::ListStream>, Status> { ) -> Result<Response<Self::ListStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(5); let (tx, rx) = tokio::sync::mpsc::channel(5);
let mut stream = self.path_info_service.list(); let mut stream = self.inner.list();
let _task = task::spawn(async move { let _task = task::spawn(async move {
while let Some(e) = stream.next().await { while let Some(e) = stream.next().await {

View file

@ -21,7 +21,7 @@ fn gen_grpc_service(
) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> { ) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> {
let blob_service = gen_blob_service(); let blob_service = gen_blob_service();
let directory_service = gen_directory_service(); let directory_service = gen_directory_service();
Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service( Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service(
blob_service, blob_service,
directory_service, directory_service,
))) )))