diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 5708579ba..78eac6f61 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -8,7 +8,9 @@ use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; use tokio::io::BufReader; use tokio_util::io::InspectReader; +use tracing::info_span; use tracing::instrument; +use tracing::Instrument; use tracing::Span; use tracing_indicatif::span_ext::IndicatifSpanExt; use walkdir::DirEntry; @@ -30,7 +32,11 @@ use super::IngestionError; /// /// This function will walk the filesystem using `walkdir` and will consume /// `O(#number of entries)` space. -#[instrument(skip(blob_service, directory_service, reference_scanner), fields(path, indicatif.pb_show=1), err)] +#[instrument( + skip(blob_service, directory_service, reference_scanner), + fields(path), + err +)] pub async fn ingest_path( blob_service: BS, directory_service: DS, @@ -44,8 +50,6 @@ where P2: AsRef<[u8]> + Send + Sync, { let span = Span::current(); - span.pb_set_message(&format!("Ingesting {:?}", path)); - span.pb_start(); let iter = WalkDir::new(path.as_ref()) .follow_links(false) @@ -158,8 +162,15 @@ where .metadata() .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?; - let digest = - upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner).await?; + let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner) + .instrument({ + let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty); + span.pb_set_message(&format!("Uploading blob for {:?}", fs_path)); + span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); + + span + }) + .await?; Ok(IngestionEntry::Regular { path, @@ -175,7 +186,7 @@ where } /// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service, reference_scanner), fields(path, indicatif.pb_show=1), err)] +#[instrument(skip(blob_service, reference_scanner), fields(path), err)] async fn upload_blob( blob_service: BS, path: impl AsRef, @@ -186,8 +197,6 @@ where P: AsRef<[u8]>, { let span = Span::current(); - span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE); - span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref())); span.pb_start(); let file = tokio::fs::File::open(path.as_ref()) diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 6cc7c39ab..1aed0a8e4 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -1,9 +1,11 @@ use clap::Parser; use clap::Subcommand; -use futures::future::try_join_all; use futures::StreamExt; use futures::TryStreamExt; +use nix_compat::nix_daemon::de::Error; +use nix_compat::nixhash::CAHash; +use nix_compat::nixhash::NixHash; use nix_compat::{path_info::ExportedPathInfo, store_path::StorePath}; use serde::Deserialize; use serde::Serialize; @@ -12,11 +14,13 @@ use std::sync::Arc; use tonic::transport::Server; use tower::ServiceBuilder; use tower_http::trace::{DefaultMakeSpan, TraceLayer}; -use tracing::{info, info_span, instrument, Level, Span}; -use tracing_indicatif::span_ext::IndicatifSpanExt as _; +use tracing::{debug, info, info_span, instrument, warn, Instrument, Level, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::import::fs::ingest_path; +use tvix_store::import::path_to_name; use tvix_store::nar::NarCalculationService; use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc}; +use tvix_tracing::TracingHandle; use tvix_castore::proto::blob_service_server::BlobServiceServer; use tvix_castore::proto::directory_service_server::DirectoryServiceServer; @@ -159,7 +163,10 @@ fn default_threads() -> usize { } #[instrument(skip_all)] -async fn run_cli(cli: Cli) -> Result<(), Box> { +async fn run_cli( + cli: Cli, + tracing_handle: TracingHandle, +) -> Result<(), Box> { match cli.command { Commands::Daemon { listen_args, @@ -242,37 +249,105 @@ async fn run_cli(cli: Cli) -> Result<(), Box = nar_calculation_service.into(); - let tasks = paths + // For each path passed, construct the name, or bail out if it's invalid. + let paths_and_names = paths .into_iter() - .map(|path| { - tokio::task::spawn({ - let blob_service = blob_service.clone(); - let directory_service = directory_service.clone(); - let path_info_service = path_info_service.clone(); - let nar_calculation_service = nar_calculation_service.clone(); + .map(|p| match path_to_name(&p) { + Ok(name) => { + let name = name.to_owned(); + Ok((p, name)) + } + Err(e) => Err(e), + }) + .collect::, _>>()?; - async move { - if let Ok(name) = tvix_store::import::path_to_name(&path) { - let resp = tvix_store::import::import_path_as_nar_ca( - &path, - name, - blob_service, - directory_service, - path_info_service, - nar_calculation_service, - ) - .await; - if let Ok(path_info) = resp { - // If the import was successful, print the path to stdout. - println!("{}", path_info.store_path.to_absolute_path()); - } + let imports_span = + info_span!("import paths", "indicatif.pb_show" = tracing::field::Empty); + imports_span.pb_set_message("Importing"); + imports_span.pb_set_length(paths_and_names.len() as u64); + imports_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + imports_span.pb_start(); + + futures::stream::iter(paths_and_names) + .map(|(path, name)| { + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + let path_info_service = path_info_service.clone(); + let nar_calculation_service = nar_calculation_service.clone(); + let imports_span = imports_span.clone(); + let tracing_handle = tracing_handle.clone(); + + async move { + let span = Span::current(); + span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + span.pb_set_message(&format!("Ingesting {:?}", path)); + span.pb_start(); + + // Ingest the contents at the given path into castore. + let root_node = ingest_path::<_, _, _, &[u8]>( + blob_service, + directory_service, + &path, + None, + ) + .await + .map_err(std::io::Error::custom)?; + + span.pb_set_message(&format!("NAR Calculation for {:?}", path)); + + // Ask for the NAR size and sha256 + let (nar_size, nar_sha256) = + nar_calculation_service.calculate_nar(&root_node).await?; + + // Calculate the output path. This might still fail, as some names are illegal. + // FUTUREWORK: express the `name` at the type level to be valid and check for this earlier. + let ca = CAHash::Nar(NixHash::Sha256(nar_sha256)); + let output_path: StorePath = + nix_compat::store_path::build_ca_path::<&str, _, _>( + &name, + &ca, + [], + false, + ) + .map_err(|e| { + warn!(err=%e, "unable to build CA path"); + std::io::Error::custom(e) + })?; + + // Construct and insert PathInfo + match path_info_service + .as_ref() + .put(PathInfo { + store_path: output_path.to_owned(), + node: root_node, + // There's no reference scanning on imported paths + references: vec![], + nar_size, + nar_sha256, + signatures: vec![], + deriver: None, + ca: Some(ca), + }) + .await + { + // If the import was successful, print the path to stdout. + Ok(path_info) => { + use std::io::Write; + debug!(store_path=%path_info.store_path.to_absolute_path(), "imported path"); + writeln!(&mut tracing_handle.get_stdout_writer(), "{}", path_info.store_path.to_absolute_path())?; + imports_span.pb_inc(1); + Ok(()) + } + Err(e) => { + warn!(?path, err=%e, "failed to import"); + Err(e) } } - }) + }.instrument(info_span!("import path", "indicatif.pb_show" = tracing::field::Empty)) }) - .collect::>(); - - try_join_all(tasks).await?; + .buffer_unordered(50) + .try_collect() + .await?; } Commands::Copy { service_addrs, @@ -465,7 +540,7 @@ async fn main() -> Result<(), Box> { } Ok(()) }, - res = run_cli(cli) => { + res = run_cli(cli, tracing_handle.clone()) => { if let Err(e) = tracing_handle.shutdown().await { eprintln!("failed to shutdown tracing: {e}"); } diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index afd85f267..07cdc4b1e 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -6,8 +6,7 @@ use nix_compat::nar::writer::r#async as nar_writer; use sha2::{Digest, Sha256}; use tokio::io::{self, AsyncWrite, BufReader}; use tonic::async_trait; -use tracing::{instrument, Span}; -use tracing_indicatif::span_ext::IndicatifSpanExt; +use tracing::instrument; use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Node}; pub struct SimpleRenderer { @@ -46,7 +45,7 @@ where /// Invoke [write_nar], and return the size and sha256 digest of the produced /// NAR output. -#[instrument(skip_all, fields(indicatif.pb_show=1))] +#[instrument(skip_all)] pub async fn calculate_size_and_sha256( root_node: &Node, blob_service: BS, @@ -59,10 +58,6 @@ where let mut h = Sha256::new(); let mut cw = CountWrite::from(&mut h); - let span = Span::current(); - span.pb_set_message("Calculating NAR"); - span.pb_start(); - write_nar( // The hasher doesn't speak async. It doesn't // actually do any I/O, so it's fine to wrap.