feat(tvix/store/bin/import): process all path imports concurrently

Change-Id: I3e1428a4725fc2e552e8f37bc0550121117fcef6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8633
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
Florian Klink 2023-05-25 09:04:13 +03:00 committed by clbot
parent dbff1289b8
commit 48b66a8982

View file

@ -1,6 +1,8 @@
use clap::Subcommand;
use data_encoding::BASE64;
use futures::future::try_join_all;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing_subscriber::prelude::*;
@ -10,6 +12,7 @@ use tvix_store::nar::NonCachingNARCalculationService;
use tvix_store::pathinfoservice::SledPathInfoService;
use tvix_store::proto::blob_service_server::BlobServiceServer;
use tvix_store::proto::directory_service_server::DirectoryServiceServer;
use tvix_store::proto::node::Node;
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
use tvix_store::proto::GRPCBlobServiceWrapper;
use tvix_store::proto::GRPCDirectoryServiceWrapper;
@ -137,44 +140,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
nar_calculation_service,
));
for path in paths {
let path_move = path.clone();
let io_move = io.clone();
let path_info = tokio::task::spawn_blocking(move || {
io_move
.import_path_with_pathinfo(&path_move)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
let tasks = paths
.iter()
.map(|path| {
let io_move = io.clone();
let path = path.clone();
let task: tokio::task::JoinHandle<Result<(), io::Error>> =
tokio::task::spawn_blocking(move || {
let path_info = io_move.import_path_with_pathinfo(&path)?;
print_node(&path_info.node.unwrap().node.unwrap(), &path);
Ok(())
});
task
})
.await??;
.collect::<Vec<tokio::task::JoinHandle<Result<(), io::Error>>>>();
match path_info.node.unwrap().node.unwrap() {
tvix_store::proto::node::Node::Directory(directory_node) => {
info!(
path = ?path,
name = directory_node.name,
digest = BASE64.encode(&directory_node.digest),
"import successful",
)
}
tvix_store::proto::node::Node::File(file_node) => {
info!(
path = ?path,
name = file_node.name,
digest = BASE64.encode(&file_node.digest),
"import successful"
)
}
tvix_store::proto::node::Node::Symlink(symlink_node) => {
info!(
path = ?path,
name = symlink_node.name,
target = symlink_node.target,
"import successful"
)
}
}
}
try_join_all(tasks).await?;
}
};
Ok(())
}
fn print_node(node: &Node, path: &Path) {
match node {
Node::Directory(directory_node) => {
info!(
path = ?path,
name = directory_node.name,
digest = BASE64.encode(&directory_node.digest),
"import successful",
)
}
Node::File(file_node) => {
info!(
path = ?path,
name = file_node.name,
digest = BASE64.encode(&file_node.digest),
"import successful"
)
}
Node::Symlink(symlink_node) => {
info!(
path = ?path,
name = symlink_node.name,
target = symlink_node.target,
"import successful"
)
}
}
}