refactor(tvix/store): move construct_services helper here

This takes three URLs, and constructs Arc'ed
{Blob,Directory,PathInfo}Service, allowing to remove some of the
boilerplate.

Change-Id: I40e7c2b551442ef2acdc543dfc87ab97e7c742bb
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10484
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2023-12-31 16:33:26 +02:00 committed by clbot
parent 41935fab70
commit f6d1a56c8c
4 changed files with 68 additions and 94 deletions

View file

@ -1,18 +1,14 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use std::{fs, path::PathBuf};
use tvix_glue::known_paths::KnownPaths;
use tvix_glue::{builtins::add_derivation_builtins, configure_nix_path};
use clap::Parser;
use rustyline::{error::ReadlineError, Editor};
use tvix_castore::blobservice::{self, BlobService};
use tvix_castore::directoryservice::{self, DirectoryService};
use tvix_eval::observer::{DisassemblingObserver, TracingObserver};
use tvix_eval::Value;
use tvix_glue::tvix_store_io::TvixStoreIO;
use tvix_store::pathinfoservice::{self, PathInfoService};
#[derive(Parser)]
struct Args {
@ -67,33 +63,6 @@ struct Args {
path_info_service_addr: String,
}
/// Construct the three store handles from their addrs.
async fn construct_services(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,
path_info_service_addr: impl AsRef<str>,
) -> std::io::Result<(
Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
Box<dyn PathInfoService>,
)> {
let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
.await?
.into();
let directory_service: Arc<dyn DirectoryService> =
directoryservice::from_addr(directory_service_addr.as_ref())
.await?
.into();
let path_info_service = pathinfoservice::from_addr(
path_info_service_addr.as_ref(),
blob_service.clone(),
directory_service.clone(),
)
.await?;
Ok((blob_service, directory_service, path_info_service))
}
/// Interprets the given code snippet, printing out warnings, errors
/// and the result itself. The return value indicates whether
/// evaluation succeeded.
@ -109,7 +78,7 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b
let directory_service_addr = args.directory_service_addr.clone();
let path_info_service_addr = args.path_info_service_addr.clone();
async move {
construct_services(
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,

View file

@ -11,10 +11,6 @@ use tokio_listener::Listener;
use tokio_listener::SystemOptions;
use tokio_listener::UserOptions;
use tracing_subscriber::prelude::*;
use tvix_castore::blobservice;
use tvix_castore::blobservice::BlobService;
use tvix_castore::directoryservice;
use tvix_castore::directoryservice::DirectoryService;
use tvix_castore::import;
use tvix_castore::proto::blob_service_server::BlobServiceServer;
use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
@ -22,7 +18,6 @@ use tvix_castore::proto::node::Node;
use tvix_castore::proto::GRPCBlobServiceWrapper;
use tvix_castore::proto::GRPCDirectoryServiceWrapper;
use tvix_castore::proto::NamedNode;
use tvix_store::pathinfoservice;
use tvix_store::pathinfoservice::PathInfoService;
use tvix_store::proto::nar_info;
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
@ -196,18 +191,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
path_info_service_addr,
} => {
// initialize stores
let blob_service: Arc<dyn BlobService> =
blobservice::from_addr(&blob_service_addr).await?.into();
let directory_service: Arc<dyn DirectoryService> =
directoryservice::from_addr(&directory_service_addr)
.await?
.into();
let path_info_service = pathinfoservice::from_addr(
&path_info_service_addr,
blob_service.clone(),
directory_service.clone(),
)
.await?;
let (blob_service, directory_service, path_info_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await?;
let listen_address = listen_address
.unwrap_or_else(|| "[::]:8000".to_string())
@ -225,7 +215,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
GRPCDirectoryServiceWrapper::from(directory_service),
))
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
path_info_service,
Arc::from(path_info_service),
)));
#[cfg(feature = "tonic-reflection")]
@ -255,18 +245,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
path_info_service_addr,
} => {
// FUTUREWORK: allow flat for single files?
let blob_service: Arc<dyn BlobService> =
blobservice::from_addr(&blob_service_addr).await?.into();
let directory_service: Arc<dyn DirectoryService> =
directoryservice::from_addr(&directory_service_addr)
.await?
.into();
let path_info_service = pathinfoservice::from_addr(
&path_info_service_addr,
blob_service.clone(),
directory_service.clone(),
)
.await?;
let (blob_service, directory_service, path_info_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await?;
// Arc the PathInfoService, as we clone it .
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
@ -361,27 +346,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
list_root,
threads,
} => {
let blob_service: Arc<dyn BlobService> =
blobservice::from_addr(&blob_service_addr).await?.into();
let directory_service: Arc<dyn DirectoryService> =
directoryservice::from_addr(&directory_service_addr)
.await?
.into();
let path_info_service = pathinfoservice::from_addr(
&path_info_service_addr,
blob_service.clone(),
directory_service.clone(),
)
.await?;
// Arc the PathInfoService, as TvixStoreFS requires Clone
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let (blob_service, directory_service, path_info_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await?;
let mut fuse_daemon = tokio::task::spawn_blocking(move || {
let fs = make_fs(
blob_service,
directory_service,
path_info_service,
Arc::from(path_info_service),
list_root,
);
info!(mount_path=?dest, "mounting");
@ -409,27 +386,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
path_info_service_addr,
list_root,
} => {
let blob_service: Arc<dyn BlobService> =
blobservice::from_addr(&blob_service_addr).await?.into();
let directory_service: Arc<dyn DirectoryService> =
directoryservice::from_addr(&directory_service_addr)
.await?
.into();
let path_info_service = pathinfoservice::from_addr(
&path_info_service_addr,
blob_service.clone(),
directory_service.clone(),
)
.await?;
// Arc the PathInfoService, as TvixStoreFS requires Clone
let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let (blob_service, directory_service, path_info_service) =
tvix_store::utils::construct_services(
blob_service_addr,
directory_service_addr,
path_info_service_addr,
)
.await?;
tokio::task::spawn_blocking(move || {
let fs = make_fs(
blob_service,
directory_service,
path_info_service,
Arc::from(path_info_service),
list_root,
);
info!(socket_path=?socket, "starting virtiofs-daemon");

View file

@ -1,6 +1,7 @@
pub mod nar;
pub mod pathinfoservice;
pub mod proto;
pub mod utils;
#[cfg(test)]
mod tests;

35
tvix/store/src/utils.rs Normal file
View file

@ -0,0 +1,35 @@
use std::sync::Arc;
use tvix_castore::{
blobservice::{self, BlobService},
directoryservice::{self, DirectoryService},
};
use crate::pathinfoservice::{self, PathInfoService};
/// Construct the three store handles from their addrs.
pub async fn construct_services(
blob_service_addr: impl AsRef<str>,
directory_service_addr: impl AsRef<str>,
path_info_service_addr: impl AsRef<str>,
) -> std::io::Result<(
Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
Box<dyn PathInfoService>,
)> {
let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref())
.await?
.into();
let directory_service: Arc<dyn DirectoryService> =
directoryservice::from_addr(directory_service_addr.as_ref())
.await?
.into();
let path_info_service = pathinfoservice::from_addr(
path_info_service_addr.as_ref(),
blob_service.clone(),
directory_service.clone(),
)
.await?;
Ok((blob_service, directory_service, path_info_service))
}