There's no more consumers left. Change-Id: I0585abbdbe0ffcf35cd20ac58077ade67cbf5c75 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12715 Reviewed-by: Ilan Joselevich <personal@ilanjoselevich.com> Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de>
323 lines
13 KiB
Rust
323 lines
13 KiB
Rust
use indicatif::ProgressStyle;
|
|
use std::sync::LazyLock;
|
|
use tokio::sync::{mpsc, oneshot};
|
|
use tracing::level_filters::LevelFilter;
|
|
use tracing_indicatif::{
|
|
filter::IndicatifFilter, util::FilteredFormatFields, writer, IndicatifLayer, IndicatifWriter,
|
|
};
|
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
|
|
|
|
#[cfg(feature = "otlp")]
|
|
use opentelemetry::{
|
|
trace::{Tracer, TracerProvider},
|
|
KeyValue,
|
|
};
|
|
#[cfg(feature = "otlp")]
|
|
use opentelemetry_sdk::{
|
|
propagation::TraceContextPropagator,
|
|
resource::{ResourceDetector, SdkProvidedResourceDetector},
|
|
trace::BatchConfigBuilder,
|
|
Resource,
|
|
};
|
|
#[cfg(feature = "tracy")]
|
|
use tracing_tracy::TracyLayer;
|
|
|
|
pub mod propagate;
|
|
|
|
pub static PB_PROGRESS_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
|
|
ProgressStyle::with_template(
|
|
"{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}",
|
|
)
|
|
.expect("invalid progress template")
|
|
});
|
|
pub static PB_TRANSFER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
|
|
ProgressStyle::with_template(
|
|
"{span_child_prefix} {wide_msg} {binary_bytes:>7}/{binary_total_bytes:7}@{decimal_bytes_per_sec} ({elapsed}) {bar:10} "
|
|
)
|
|
.expect("invalid progress template")
|
|
});
|
|
pub static PB_SPINNER_STYLE: LazyLock<ProgressStyle> = LazyLock::new(|| {
|
|
ProgressStyle::with_template(
|
|
"{span_child_prefix}{spinner} {wide_msg} ({elapsed}) {pos:>7}/{len:7}",
|
|
)
|
|
.expect("invalid progress template")
|
|
});
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum Error {
|
|
#[error(transparent)]
|
|
Init(#[from] tracing_subscriber::util::TryInitError),
|
|
|
|
#[error(transparent)]
|
|
MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>),
|
|
|
|
#[error(transparent)]
|
|
OneshotRecv(#[from] oneshot::error::RecvError),
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct TracingHandle {
|
|
tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>,
|
|
|
|
stdout_writer: IndicatifWriter<writer::Stdout>,
|
|
stderr_writer: IndicatifWriter<writer::Stderr>,
|
|
}
|
|
|
|
impl TracingHandle {
|
|
/// Returns a writer for [std::io::Stdout] that ensures its output will not be clobbered by
|
|
/// active progress bars.
|
|
///
|
|
/// Instead of `println!(...)` prefer `writeln!(handle.get_stdout_writer(), ...)`
|
|
pub fn get_stdout_writer(&self) -> IndicatifWriter<writer::Stdout> {
|
|
// clone is fine here because its only a wrapper over an `Arc`
|
|
self.stdout_writer.clone()
|
|
}
|
|
|
|
/// Returns a writer for [std::io::Stderr] that ensures its output will not be clobbered by
|
|
/// active progress bars.
|
|
///
|
|
/// Instead of `println!(...)` prefer `writeln!(handle.get_stderr_writer(), ...)`.
|
|
pub fn get_stderr_writer(&self) -> IndicatifWriter<writer::Stderr> {
|
|
// clone is fine here because its only a wrapper over an `Arc`
|
|
self.stderr_writer.clone()
|
|
}
|
|
|
|
/// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
|
|
/// If there is none enabled this will result in a noop.
|
|
///
|
|
/// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which
|
|
/// will receive a message once the flush is completed.
|
|
pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> {
|
|
if let Some(tx) = &self.tx {
|
|
Ok(tx.send(msg).await?)
|
|
} else {
|
|
// If we have a message passed in we need to notify the receiver
|
|
if let Some(tx) = msg {
|
|
let _ = tx.send(());
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// This will flush all all attached tracing providers and will wait until the flush is completed.
|
|
/// If no tracing providers like otlp are attached then this will be a noop.
|
|
///
|
|
/// This should only be called on a regular shutdown.
|
|
/// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown)
|
|
/// otherwise you will get otlp errors.
|
|
pub async fn shutdown(&self) -> Result<(), Error> {
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
self.flush(Some(tx)).await?;
|
|
rx.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// This will flush all all attached tracing providers and will wait until the flush is completed.
|
|
/// After this it will do some other necessary cleanup.
|
|
/// If no tracing providers like otlp are attached then this will be a noop.
|
|
///
|
|
/// This should only be used if the tool received an ctrl_c otherwise you will get otlp errors.
|
|
/// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown)
|
|
/// method.
|
|
pub async fn force_shutdown(&self) -> Result<(), Error> {
|
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
|
self.flush(Some(tx)).await?;
|
|
rx.await?;
|
|
|
|
#[cfg(feature = "otlp")]
|
|
{
|
|
// Because of a bug within otlp we currently have to use spawn_blocking otherwise
|
|
// calling `shutdown_tracer_provider` can block forever. See
|
|
// https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
|
|
//
|
|
// This still throws an error, if the tool exits regularly: "OpenTelemetry trace error
|
|
// occurred. oneshot canceled", but not having this leads to errors if we cancel with
|
|
// ctrl_c.
|
|
// So this should right now only be used on ctrl_c, for a regular exit use the
|
|
// [shutdown](#shutdown) method
|
|
let _ = tokio::task::spawn_blocking(move || {
|
|
opentelemetry::global::shutdown_tracer_provider();
|
|
})
|
|
.await;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
pub struct TracingBuilder {
|
|
progess_bar: bool,
|
|
|
|
#[cfg(feature = "otlp")]
|
|
service_name: Option<&'static str>,
|
|
}
|
|
|
|
impl TracingBuilder {
|
|
#[cfg(feature = "otlp")]
|
|
/// Enable otlp by setting a custom service_name
|
|
pub fn enable_otlp(mut self, service_name: &'static str) -> TracingBuilder {
|
|
self.service_name = Some(service_name);
|
|
self
|
|
}
|
|
|
|
/// Enable progress bar layer, default is disabled
|
|
pub fn enable_progressbar(mut self) -> TracingBuilder {
|
|
self.progess_bar = true;
|
|
self
|
|
}
|
|
|
|
/// This will setup tracing based on the configuration passed in.
|
|
/// It will setup a stderr writer output layer and configure EnvFilter to honor RUST_LOG.
|
|
/// The EnvFilter will be applied to all configured layers, also otlp.
|
|
///
|
|
/// It will also configure otlp if the feature is enabled and a service_name was provided. It
|
|
/// will then correctly setup a channel which is later used for flushing the provider.
|
|
pub fn build(self) -> Result<TracingHandle, Error> {
|
|
// Set up the tracing subscriber.
|
|
let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
|
|
let stdout_writer = indicatif_layer.get_stdout_writer();
|
|
let stderr_writer = indicatif_layer.get_stderr_writer();
|
|
let subscriber = tracing_subscriber::registry()
|
|
.with(
|
|
EnvFilter::builder()
|
|
.with_default_directive(LevelFilter::INFO.into())
|
|
.from_env()
|
|
.expect("invalid RUST_LOG"),
|
|
)
|
|
.with(
|
|
tracing_subscriber::fmt::Layer::new()
|
|
.fmt_fields(FilteredFormatFields::new(
|
|
tracing_subscriber::fmt::format::DefaultFields::new(),
|
|
|field| field.name() != "indicatif.pb_show",
|
|
))
|
|
.with_writer(indicatif_layer.get_stderr_writer())
|
|
.compact(),
|
|
)
|
|
.with((self.progess_bar).then(|| {
|
|
indicatif_layer.with_filter(
|
|
// only show progress for spans with indicatif.pb_show field being set
|
|
IndicatifFilter::new(false),
|
|
)
|
|
}));
|
|
|
|
// Setup otlp if a service_name is configured
|
|
#[cfg(feature = "otlp")]
|
|
{
|
|
if let Some(service_name) = self.service_name {
|
|
// register a text map propagator for trace propagation
|
|
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
|
|
|
|
let (tracer, tx) = gen_otlp_tracer(service_name.to_string());
|
|
// Create a tracing layer with the configured tracer
|
|
let layer = tracing_opentelemetry::layer().with_tracer(tracer);
|
|
|
|
#[cfg(feature = "tracy")]
|
|
{
|
|
subscriber
|
|
.with(TracyLayer::default())
|
|
.with(Some(layer))
|
|
.try_init()?;
|
|
}
|
|
|
|
#[cfg(not(feature = "tracy"))]
|
|
{
|
|
subscriber.with(Some(layer)).try_init()?;
|
|
}
|
|
return Ok(TracingHandle {
|
|
tx: Some(tx),
|
|
stdout_writer,
|
|
stderr_writer,
|
|
});
|
|
}
|
|
}
|
|
#[cfg(feature = "tracy")]
|
|
{
|
|
subscriber.with(TracyLayer::default()).try_init()?;
|
|
}
|
|
#[cfg(not(feature = "tracy"))]
|
|
{
|
|
subscriber.try_init()?;
|
|
}
|
|
|
|
Ok(TracingHandle {
|
|
tx: None,
|
|
stdout_writer,
|
|
stderr_writer,
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Returns an OTLP tracer, and the TX part of a channel, which can be used
|
|
/// to request flushes (and signal back the completion of the flush).
|
|
#[cfg(feature = "otlp")]
|
|
fn gen_otlp_tracer(
|
|
service_name: String,
|
|
) -> (
|
|
impl Tracer + tracing_opentelemetry::PreSampledTracer,
|
|
mpsc::Sender<Option<oneshot::Sender<()>>>,
|
|
) {
|
|
let tracer_provider = opentelemetry_otlp::new_pipeline()
|
|
.tracing()
|
|
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
|
|
.with_batch_config(
|
|
BatchConfigBuilder::default()
|
|
// the default values for `max_export_batch_size` is set to 512, which we will fill
|
|
// pretty quickly, which will then result in an export. We want to make sure that
|
|
// the export is only done once the schedule is met and not as soon as 512 spans
|
|
// are collected.
|
|
.with_max_export_batch_size(4096)
|
|
// analog to default config `max_export_batch_size * 4`
|
|
.with_max_queue_size(4096 * 4)
|
|
// only force an export to the otlp collector every 10 seconds to reduce the amount
|
|
// of error messages if an otlp collector is not available
|
|
.with_scheduled_delay(std::time::Duration::from_secs(10))
|
|
.build(),
|
|
)
|
|
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource({
|
|
// use SdkProvidedResourceDetector.detect to detect resources,
|
|
// but replace the default service name with our default.
|
|
// https://github.com/open-telemetry/opentelemetry-rust/issues/1298
|
|
let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
|
|
// SdkProvidedResourceDetector currently always sets
|
|
// `service.name`, but we don't like its default.
|
|
if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
|
|
resources.merge(&Resource::new([KeyValue::new(
|
|
"service.name",
|
|
service_name,
|
|
)]))
|
|
} else {
|
|
resources
|
|
}
|
|
}))
|
|
.install_batch(opentelemetry_sdk::runtime::Tokio)
|
|
.expect("Failed to install batch exporter using Tokio");
|
|
|
|
// Trace provider is need for later use like flushing the provider.
|
|
// Needs to be kept around for each message to rx we need to handle.
|
|
let tracer = tracer_provider.tracer("tvix");
|
|
|
|
// Set up a channel for flushing trace providers later
|
|
let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
|
|
|
|
// Spawning a task that listens on rx for any message. Once we receive a message we
|
|
// correctly call flush on the tracer_provider.
|
|
tokio::spawn(async move {
|
|
while let Some(m) = rx.recv().await {
|
|
// Because of a bug within otlp we currently have to use spawn_blocking
|
|
// otherwise will calling `force_flush` block forever, especially if the
|
|
// tool was closed with ctrl_c. See
|
|
// https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
|
|
let _ = tokio::task::spawn_blocking({
|
|
let tracer_provider = tracer_provider.clone();
|
|
move || tracer_provider.force_flush()
|
|
})
|
|
.await;
|
|
if let Some(tx) = m {
|
|
let _ = tx.send(());
|
|
}
|
|
}
|
|
});
|
|
|
|
(tracer, tx)
|
|
}
|