27ff98000b
To construct various stores at runtime, we need to eliminate associated types from the BlobService trait, and return Box<dyn …> instead of specific types. This also means we can't consume self in the close() method, so everything we write to is put in an Option<>, and during the first close we take from there. Change-Id: Ia523b6ab2f2a5276f51cb5d17e81a5925bce69b6 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8647 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su>
233 lines
8 KiB
Rust
233 lines
8 KiB
Rust
use clap::Subcommand;
|
|
use data_encoding::BASE64;
|
|
use futures::future::try_join_all;
|
|
use std::io;
|
|
use std::path::Path;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tracing_subscriber::prelude::*;
|
|
use tvix_store::blobservice::BlobService;
|
|
use tvix_store::blobservice::GRPCBlobService;
|
|
use tvix_store::blobservice::SledBlobService;
|
|
use tvix_store::directoryservice::GRPCDirectoryService;
|
|
use tvix_store::directoryservice::SledDirectoryService;
|
|
use tvix_store::nar::GRPCNARCalculationService;
|
|
use tvix_store::nar::NonCachingNARCalculationService;
|
|
use tvix_store::pathinfoservice::GRPCPathInfoService;
|
|
use tvix_store::pathinfoservice::SledPathInfoService;
|
|
use tvix_store::proto::blob_service_client::BlobServiceClient;
|
|
use tvix_store::proto::blob_service_server::BlobServiceServer;
|
|
use tvix_store::proto::directory_service_client::DirectoryServiceClient;
|
|
use tvix_store::proto::directory_service_server::DirectoryServiceServer;
|
|
use tvix_store::proto::node::Node;
|
|
use tvix_store::proto::path_info_service_client::PathInfoServiceClient;
|
|
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
|
|
use tvix_store::proto::GRPCBlobServiceWrapper;
|
|
use tvix_store::proto::GRPCDirectoryServiceWrapper;
|
|
use tvix_store::proto::GRPCPathInfoServiceWrapper;
|
|
use tvix_store::TvixStoreIO;
|
|
use tvix_store::FUSE;
|
|
|
|
#[cfg(feature = "reflection")]
|
|
use tvix_store::proto::FILE_DESCRIPTOR_SET;
|
|
|
|
use clap::Parser;
|
|
use tonic::{transport::Server, Result};
|
|
use tracing::{info, Level};
|
|
|
|
#[derive(Parser)]
|
|
#[command(author, version, about, long_about = None)]
|
|
struct Cli {
|
|
/// Whether to log in JSON
|
|
#[arg(long)]
|
|
json: bool,
|
|
|
|
#[arg(long)]
|
|
log_level: Option<Level>,
|
|
|
|
#[command(subcommand)]
|
|
command: Commands,
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum Commands {
|
|
/// Runs the tvix-store daemon.
|
|
Daemon {
|
|
#[arg(long, short = 'l')]
|
|
listen_address: Option<String>,
|
|
},
|
|
/// Imports a list of paths into the store (not using the daemon)
|
|
Import {
|
|
#[clap(value_name = "PATH")]
|
|
paths: Vec<PathBuf>,
|
|
},
|
|
/// Mounts a tvix-store at the given mountpoint
|
|
#[cfg(feature = "fuse")]
|
|
Mount {
|
|
#[clap(value_name = "PATH")]
|
|
dest: PathBuf,
|
|
},
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
let cli = Cli::parse();
|
|
|
|
// configure log settings
|
|
let level = cli.log_level.unwrap_or(Level::INFO);
|
|
|
|
let subscriber = tracing_subscriber::registry()
|
|
.with(if cli.json {
|
|
Some(
|
|
tracing_subscriber::fmt::Layer::new()
|
|
.with_writer(io::stdout.with_max_level(level))
|
|
.json(),
|
|
)
|
|
} else {
|
|
None
|
|
})
|
|
.with(if !cli.json {
|
|
Some(
|
|
tracing_subscriber::fmt::Layer::new()
|
|
.with_writer(io::stdout.with_max_level(level))
|
|
.pretty(),
|
|
)
|
|
} else {
|
|
None
|
|
});
|
|
|
|
tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber");
|
|
|
|
match cli.command {
|
|
Commands::Daemon { listen_address } => {
|
|
// initialize stores
|
|
let blob_service = SledBlobService::new("blobs.sled".into())?;
|
|
let directory_service = SledDirectoryService::new("directories.sled".into())?;
|
|
let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?;
|
|
|
|
let listen_address = listen_address
|
|
.unwrap_or_else(|| "[::]:8000".to_string())
|
|
.parse()
|
|
.unwrap();
|
|
|
|
let mut server = Server::builder();
|
|
|
|
let nar_calculation_service = NonCachingNARCalculationService::new(
|
|
Box::new(blob_service.clone()),
|
|
directory_service.clone(),
|
|
);
|
|
|
|
#[allow(unused_mut)]
|
|
let mut router = server
|
|
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
|
|
Box::new(blob_service) as Box<dyn BlobService>,
|
|
)))
|
|
.add_service(DirectoryServiceServer::new(
|
|
GRPCDirectoryServiceWrapper::from(directory_service),
|
|
))
|
|
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
|
|
path_info_service,
|
|
nar_calculation_service,
|
|
)));
|
|
|
|
#[cfg(feature = "reflection")]
|
|
{
|
|
let reflection_svc = tonic_reflection::server::Builder::configure()
|
|
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
|
|
.build()?;
|
|
router = router.add_service(reflection_svc);
|
|
}
|
|
|
|
info!("tvix-store listening on {}", listen_address);
|
|
|
|
router.serve(listen_address).await?;
|
|
}
|
|
Commands::Import { paths } => {
|
|
let blob_service = GRPCBlobService::from_client(
|
|
BlobServiceClient::connect("http://[::1]:8000").await?,
|
|
);
|
|
let directory_service = GRPCDirectoryService::from_client(
|
|
DirectoryServiceClient::connect("http://[::1]:8000").await?,
|
|
);
|
|
let path_info_service_client =
|
|
PathInfoServiceClient::connect("http://[::1]:8000").await?;
|
|
let path_info_service =
|
|
GRPCPathInfoService::from_client(path_info_service_client.clone());
|
|
let nar_calculation_service =
|
|
GRPCNARCalculationService::from_client(path_info_service_client);
|
|
|
|
let io = Arc::new(TvixStoreIO::new(
|
|
Box::new(blob_service),
|
|
directory_service,
|
|
path_info_service,
|
|
nar_calculation_service,
|
|
));
|
|
|
|
let tasks = paths
|
|
.iter()
|
|
.map(|path| {
|
|
let io_move = io.clone();
|
|
let path = path.clone();
|
|
let task: tokio::task::JoinHandle<Result<(), io::Error>> =
|
|
tokio::task::spawn_blocking(move || {
|
|
let path_info = io_move.import_path_with_pathinfo(&path)?;
|
|
print_node(&path_info.node.unwrap().node.unwrap(), &path);
|
|
Ok(())
|
|
});
|
|
task
|
|
})
|
|
.collect::<Vec<tokio::task::JoinHandle<Result<(), io::Error>>>>();
|
|
|
|
try_join_all(tasks).await?;
|
|
}
|
|
#[cfg(feature = "fuse")]
|
|
Commands::Mount { dest } => {
|
|
let blob_service = GRPCBlobService::from_client(
|
|
BlobServiceClient::connect("http://[::1]:8000").await?,
|
|
);
|
|
let directory_service = GRPCDirectoryService::from_client(
|
|
DirectoryServiceClient::connect("http://[::1]:8000").await?,
|
|
);
|
|
let path_info_service_client =
|
|
PathInfoServiceClient::connect("http://[::1]:8000").await?;
|
|
let path_info_service =
|
|
GRPCPathInfoService::from_client(path_info_service_client.clone());
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
let f = FUSE::new(path_info_service, directory_service, blob_service);
|
|
fuser::mount2(f, &dest, &[])
|
|
})
|
|
.await??
|
|
}
|
|
};
|
|
Ok(())
|
|
}
|
|
|
|
fn print_node(node: &Node, path: &Path) {
|
|
match node {
|
|
Node::Directory(directory_node) => {
|
|
info!(
|
|
path = ?path,
|
|
name = directory_node.name,
|
|
digest = BASE64.encode(&directory_node.digest),
|
|
"import successful",
|
|
)
|
|
}
|
|
Node::File(file_node) => {
|
|
info!(
|
|
path = ?path,
|
|
name = file_node.name,
|
|
digest = BASE64.encode(&file_node.digest),
|
|
"import successful"
|
|
)
|
|
}
|
|
Node::Symlink(symlink_node) => {
|
|
info!(
|
|
path = ?path,
|
|
name = symlink_node.name,
|
|
target = symlink_node.target,
|
|
"import successful"
|
|
)
|
|
}
|
|
}
|
|
}
|