feat(tvix/glue): report progress on all fetches, use progress bars

This should also report progress on fetches which we couldn't delay
until actually having to IO into them, like `builtins.fetchurl` calls
without a upfront-provided hash.

While at it, upgrade the progress spinners to progress bars, which
increment if we know the size of the fetch.

Change-Id: Ic3f332286d8bc2177f3d994ba25b165728d4b702
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11797
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: aspen <root@gws.fyi>
This commit is contained in:
Florian Klink 2024-06-13 10:26:35 +03:00 committed by clbot
parent 7ee55c293c
commit 99c5a2e8bc
3 changed files with 41 additions and 14 deletions

View file

@ -7,7 +7,6 @@ use crate::{
}; };
use nix_compat::nixhash; use nix_compat::nixhash;
use std::rc::Rc; use std::rc::Rc;
use tracing::info;
use tvix_eval::builtin_macros::builtins; use tvix_eval::builtin_macros::builtins;
use tvix_eval::generators::Gen; use tvix_eval::generators::Gen;
use tvix_eval::generators::GenCo; use tvix_eval::generators::GenCo;
@ -112,8 +111,6 @@ pub(crate) mod fetcher_builtins {
} }
None => { None => {
// If we don't have enough info, do the fetch now. // If we don't have enough info, do the fetch now.
info!(?fetch, "triggering required fetch");
let (store_path, _root_node) = state let (store_path, _root_node) = state
.tokio_handle .tokio_handle
.block_on(async { state.fetcher.ingest_and_persist(&name, fetch).await }) .block_on(async { state.fetcher.ingest_and_persist(&name, fetch).await })

View file

@ -8,7 +8,8 @@ use sha1::Sha1;
use sha2::{digest::Output, Digest, Sha256, Sha512}; use sha2::{digest::Output, Digest, Sha256, Sha512};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio_util::io::{InspectReader, InspectWriter}; use tokio_util::io::{InspectReader, InspectWriter};
use tracing::warn; use tracing::{instrument, warn, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use tvix_castore::{ use tvix_castore::{
blobservice::BlobService, blobservice::BlobService,
directoryservice::DirectoryService, directoryservice::DirectoryService,
@ -196,10 +197,18 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
/// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it. /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it.
/// In case the URI uses the file:// scheme, use tokio::fs to open it. /// In case the URI uses the file:// scheme, use tokio::fs to open it.
#[instrument(skip_all, fields(url, indicatif.pb_show=1), err)]
async fn download( async fn download(
&self, &self,
url: Url, url: Url,
) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> { ) -> Result<Box<dyn AsyncBufRead + Unpin + Send>, FetcherError> {
let span = Span::current();
span.pb_set_message(&format!(
"📡Fetching {}",
// TOOD: maybe shorten
redact_url(&url)
));
match url.scheme() { match url.scheme() {
"file" => { "file" => {
let f = tokio::fs::File::open(url.to_file_path().map_err(|_| { let f = tokio::fs::File::open(url.to_file_path().map_err(|_| {
@ -211,12 +220,34 @@ impl<BS, DS, PS, NS> Fetcher<BS, DS, PS, NS> {
)) ))
})?) })?)
.await?; .await?;
Ok(Box::new(tokio::io::BufReader::new(f)))
span.pb_set_length(f.metadata().await?.len());
span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
span.pb_start();
Ok(Box::new(tokio::io::BufReader::new(InspectReader::new(
f,
move |d| {
span.pb_inc(d.len() as u64);
},
))))
} }
_ => { _ => {
let resp = self.http_client.get(url).send().await?; let resp = self.http_client.get(url).send().await?;
if let Some(content_length) = resp.content_length() {
span.pb_set_length(content_length);
span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
} else {
span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
}
span.pb_start();
Ok(Box::new(tokio_util::io::StreamReader::new( Ok(Box::new(tokio_util::io::StreamReader::new(
resp.bytes_stream().map_err(|e| { resp.bytes_stream()
.inspect_ok(move |d| {
span.pb_inc(d.len() as u64);
})
.map_err(|e| {
let e = e.without_url(); let e = e.without_url();
warn!(%e, "failed to get response body"); warn!(%e, "failed to get response body");
std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)

View file

@ -135,9 +135,6 @@ impl TvixStoreIO {
// it for things like <nixpkgs> pointing to a store path. // it for things like <nixpkgs> pointing to a store path.
// In the future, these things will (need to) have PathInfo. // In the future, these things will (need to) have PathInfo.
None => { None => {
let span = Span::current();
span.pb_start();
span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
// The store path doesn't exist yet, so we need to fetch or build it. // The store path doesn't exist yet, so we need to fetch or build it.
// We check for fetches first, as we might have both native // We check for fetches first, as we might have both native
// fetchers and FODs in KnownPaths, and prefer the former. // fetchers and FODs in KnownPaths, and prefer the former.
@ -150,7 +147,6 @@ impl TvixStoreIO {
match maybe_fetch { match maybe_fetch {
Some((name, fetch)) => { Some((name, fetch)) => {
span.pb_set_message(&format!("⏳Fetching {}", &store_path));
let (sp, root_node) = self let (sp, root_node) = self
.fetcher .fetcher
.ingest_and_persist(&name, fetch) .ingest_and_persist(&name, fetch)
@ -183,6 +179,9 @@ impl TvixStoreIO {
} }
} }
}; };
let span = Span::current();
span.pb_start();
span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
span.pb_set_message(&format!("🔨Building {}", &store_path)); span.pb_set_message(&format!("🔨Building {}", &store_path));
// derivation_to_build_request needs castore nodes for all inputs. // derivation_to_build_request needs castore nodes for all inputs.