refactor(tvix): make indicatif.pb_show=1 more explicit

This pushes generating spans with pb_show up to the caller.
They usually have more context on how to present things, if at all.

Change-Id: Icfcaa64a8a57dce50c0261f2d06e7c051e3946c2
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12657
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2024-10-15 16:16:11 +03:00 committed by flokli
parent 3fda90602d
commit d52d889f2b
3 changed files with 125 additions and 46 deletions

View file

@ -8,7 +8,9 @@ use std::os::unix::fs::MetadataExt;
use std::os::unix::fs::PermissionsExt;
use tokio::io::BufReader;
use tokio_util::io::InspectReader;
use tracing::info_span;
use tracing::instrument;
use tracing::Instrument;
use tracing::Span;
use tracing_indicatif::span_ext::IndicatifSpanExt;
use walkdir::DirEntry;
@ -30,7 +32,11 @@ use super::IngestionError;
///
/// This function will walk the filesystem using `walkdir` and will consume
/// `O(#number of entries)` space.
#[instrument(skip(blob_service, directory_service, reference_scanner), fields(path, indicatif.pb_show=1), err)]
#[instrument(
skip(blob_service, directory_service, reference_scanner),
fields(path),
err
)]
pub async fn ingest_path<BS, DS, P, P2>(
blob_service: BS,
directory_service: DS,
@ -44,8 +50,6 @@ where
P2: AsRef<[u8]> + Send + Sync,
{
let span = Span::current();
span.pb_set_message(&format!("Ingesting {:?}", path));
span.pb_start();
let iter = WalkDir::new(path.as_ref())
.follow_links(false)
@ -158,8 +162,15 @@ where
.metadata()
.map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
let digest =
upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner).await?;
let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner)
.instrument({
let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty);
span.pb_set_message(&format!("Uploading blob for {:?}", fs_path));
span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
span
})
.await?;
Ok(IngestionEntry::Regular {
path,
@ -175,7 +186,7 @@ where
}
/// Uploads the file at the provided [Path] the the [BlobService].
#[instrument(skip(blob_service, reference_scanner), fields(path, indicatif.pb_show=1), err)]
#[instrument(skip(blob_service, reference_scanner), fields(path), err)]
async fn upload_blob<BS, P>(
blob_service: BS,
path: impl AsRef<std::path::Path>,
@ -186,8 +197,6 @@ where
P: AsRef<[u8]>,
{
let span = Span::current();
span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref()));
span.pb_start();
let file = tokio::fs::File::open(path.as_ref())

View file

@ -1,9 +1,11 @@
use clap::Parser;
use clap::Subcommand;
use futures::future::try_join_all;
use futures::StreamExt;
use futures::TryStreamExt;
use nix_compat::nix_daemon::de::Error;
use nix_compat::nixhash::CAHash;
use nix_compat::nixhash::NixHash;
use nix_compat::{path_info::ExportedPathInfo, store_path::StorePath};
use serde::Deserialize;
use serde::Serialize;
@ -12,11 +14,13 @@ use std::sync::Arc;
use tonic::transport::Server;
use tower::ServiceBuilder;
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::{info, info_span, instrument, Level, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
use tracing::{debug, info, info_span, instrument, warn, Instrument, Level, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tvix_castore::import::fs::ingest_path;
use tvix_store::import::path_to_name;
use tvix_store::nar::NarCalculationService;
use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc};
use tvix_tracing::TracingHandle;
use tvix_castore::proto::blob_service_server::BlobServiceServer;
use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
@ -159,7 +163,10 @@ fn default_threads() -> usize {
}
#[instrument(skip_all)]
async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn run_cli(
cli: Cli,
tracing_handle: TracingHandle,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match cli.command {
Commands::Daemon {
listen_args,
@ -242,37 +249,105 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
let nar_calculation_service: Arc<dyn NarCalculationService> =
nar_calculation_service.into();
let tasks = paths
// For each path passed, construct the name, or bail out if it's invalid.
let paths_and_names = paths
.into_iter()
.map(|path| {
tokio::task::spawn({
let blob_service = blob_service.clone();
let directory_service = directory_service.clone();
let path_info_service = path_info_service.clone();
let nar_calculation_service = nar_calculation_service.clone();
.map(|p| match path_to_name(&p) {
Ok(name) => {
let name = name.to_owned();
Ok((p, name))
}
Err(e) => Err(e),
})
.collect::<Result<Vec<_>, _>>()?;
async move {
if let Ok(name) = tvix_store::import::path_to_name(&path) {
let resp = tvix_store::import::import_path_as_nar_ca(
&path,
name,
blob_service,
directory_service,
path_info_service,
nar_calculation_service,
)
.await;
if let Ok(path_info) = resp {
// If the import was successful, print the path to stdout.
println!("{}", path_info.store_path.to_absolute_path());
}
let imports_span =
info_span!("import paths", "indicatif.pb_show" = tracing::field::Empty);
imports_span.pb_set_message("Importing");
imports_span.pb_set_length(paths_and_names.len() as u64);
imports_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
imports_span.pb_start();
futures::stream::iter(paths_and_names)
.map(|(path, name)| {
let blob_service = blob_service.clone();
let directory_service = directory_service.clone();
let path_info_service = path_info_service.clone();
let nar_calculation_service = nar_calculation_service.clone();
let imports_span = imports_span.clone();
let tracing_handle = tracing_handle.clone();
async move {
let span = Span::current();
span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
span.pb_set_message(&format!("Ingesting {:?}", path));
span.pb_start();
// Ingest the contents at the given path into castore.
let root_node = ingest_path::<_, _, _, &[u8]>(
blob_service,
directory_service,
&path,
None,
)
.await
.map_err(std::io::Error::custom)?;
span.pb_set_message(&format!("NAR Calculation for {:?}", path));
// Ask for the NAR size and sha256
let (nar_size, nar_sha256) =
nar_calculation_service.calculate_nar(&root_node).await?;
// Calculate the output path. This might still fail, as some names are illegal.
// FUTUREWORK: express the `name` at the type level to be valid and check for this earlier.
let ca = CAHash::Nar(NixHash::Sha256(nar_sha256));
let output_path: StorePath<String> =
nix_compat::store_path::build_ca_path::<&str, _, _>(
&name,
&ca,
[],
false,
)
.map_err(|e| {
warn!(err=%e, "unable to build CA path");
std::io::Error::custom(e)
})?;
// Construct and insert PathInfo
match path_info_service
.as_ref()
.put(PathInfo {
store_path: output_path.to_owned(),
node: root_node,
// There's no reference scanning on imported paths
references: vec![],
nar_size,
nar_sha256,
signatures: vec![],
deriver: None,
ca: Some(ca),
})
.await
{
// If the import was successful, print the path to stdout.
Ok(path_info) => {
use std::io::Write;
debug!(store_path=%path_info.store_path.to_absolute_path(), "imported path");
writeln!(&mut tracing_handle.get_stdout_writer(), "{}", path_info.store_path.to_absolute_path())?;
imports_span.pb_inc(1);
Ok(())
}
Err(e) => {
warn!(?path, err=%e, "failed to import");
Err(e)
}
}
})
}.instrument(info_span!("import path", "indicatif.pb_show" = tracing::field::Empty))
})
.collect::<Vec<_>>();
try_join_all(tasks).await?;
.buffer_unordered(50)
.try_collect()
.await?;
}
Commands::Copy {
service_addrs,
@ -465,7 +540,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}
Ok(())
},
res = run_cli(cli) => {
res = run_cli(cli, tracing_handle.clone()) => {
if let Err(e) = tracing_handle.shutdown().await {
eprintln!("failed to shutdown tracing: {e}");
}

View file

@ -6,8 +6,7 @@ use nix_compat::nar::writer::r#async as nar_writer;
use sha2::{Digest, Sha256};
use tokio::io::{self, AsyncWrite, BufReader};
use tonic::async_trait;
use tracing::{instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tracing::instrument;
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Node};
pub struct SimpleRenderer<BS, DS> {
@ -46,7 +45,7 @@ where
/// Invoke [write_nar], and return the size and sha256 digest of the produced
/// NAR output.
#[instrument(skip_all, fields(indicatif.pb_show=1))]
#[instrument(skip_all)]
pub async fn calculate_size_and_sha256<BS, DS>(
root_node: &Node,
blob_service: BS,
@ -59,10 +58,6 @@ where
let mut h = Sha256::new();
let mut cw = CountWrite::from(&mut h);
let span = Span::current();
span.pb_set_message("Calculating NAR");
span.pb_start();
write_nar(
// The hasher doesn't speak async. It doesn't
// actually do any I/O, so it's fine to wrap.