diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs index c7e4e2751..16a003c71 100644 --- a/tvix/tracing/src/lib.rs +++ b/tvix/tracing/src/lib.rs @@ -12,10 +12,7 @@ use tracing_subscriber::{ }; #[cfg(feature = "otlp")] -use opentelemetry::{ - trace::{Tracer, TracerProvider}, - KeyValue, -}; +use opentelemetry::{trace::Tracer, KeyValue}; #[cfg(feature = "otlp")] use opentelemetry_sdk::{ propagation::TraceContextPropagator, @@ -228,8 +225,15 @@ impl TracingBuilder { // register a text map propagator for trace propagation opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); - let (tracer, sender) = gen_otlp_tracer(service_name.to_string()); + let (tracer, meter_provider, sender) = + gen_otlp_tracer_meter_provider(service_name.to_string()); + flush_tx = Some(sender); + + // Register the returned meter provider as the global one. + // FUTUREWORK: store in the struct and provide getter instead? + opentelemetry::global::set_meter_provider(meter_provider); + // Create a tracing layer with the configured tracer Some(tracing_opentelemetry::layer().with_tracer(tracer)) } else { @@ -319,17 +323,49 @@ fn gen_tracer_provider( Ok(tracer_provider) } -/// Returns an OTLP tracer, and the TX part of a channel, which can be used -/// to request flushes (and signal back the completion of the flush). #[cfg(feature = "otlp")] -fn gen_otlp_tracer( +fn gen_meter_provider( + service_name: String, +) -> Result { + use std::time::Duration; + + use opentelemetry_otlp::WithExportConfig; + use opentelemetry_sdk::{ + metrics::{PeriodicReader, SdkMeterProvider}, + runtime, + }; + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_timeout(Duration::from_secs(10)) + .build()?; + + Ok(SdkMeterProvider::builder() + .with_reader( + PeriodicReader::builder(exporter, runtime::Tokio) + .with_interval(Duration::from_secs(3)) + .with_timeout(Duration::from_secs(10)) + .build(), + ) + .with_resource(gen_resources(service_name)) + .build()) +} + +/// Returns an OTLP tracer, and a meter provider, as well as the TX part +/// of a channel, which can be used to request flushes (and signal back the +/// completion of the flush). +#[cfg(feature = "otlp")] +fn gen_otlp_tracer_meter_provider( service_name: String, ) -> ( - impl Tracer + tracing_opentelemetry::PreSampledTracer + 'static, + impl Tracer + tracing_opentelemetry::PreSampledTracer, + opentelemetry_sdk::metrics::SdkMeterProvider, mpsc::Sender>, ) { + use opentelemetry::trace::TracerProvider; let tracer_provider = gen_tracer_provider(service_name.clone()).expect("Unable to configure trace provider"); + let meter_provider = + gen_meter_provider(service_name).expect("Unable to configure meter provider"); // tracer_provider needs to be kept around so we can request flushes later. let tracer = tracer_provider.tracer("tvix"); @@ -339,23 +375,31 @@ fn gen_otlp_tracer( // Spawning a task that listens on rx for any message. Once we receive a message we // correctly call flush on the tracer_provider. - tokio::spawn(async move { - while let Some(m) = flush_rx.recv().await { - // Because of a bug within otlp we currently have to use spawn_blocking - // otherwise will calling `force_flush` block forever, especially if the - // tool was closed with ctrl_c. See - // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335 - let _ = tokio::task::spawn_blocking({ - let tracer_provider = tracer_provider.clone(); + tokio::spawn({ + let meter_provider = meter_provider.clone(); - move || { - tracer_provider.force_flush(); - } - }) - .await; - let _ = m.send(()); + async move { + while let Some(m) = flush_rx.recv().await { + // Because of a bug within otlp we currently have to use spawn_blocking + // otherwise will calling `force_flush` block forever, especially if the + // tool was closed with ctrl_c. See + // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335 + let _ = tokio::task::spawn_blocking({ + let tracer_provider = tracer_provider.clone(); + let meter_provider = meter_provider.clone(); + + move || { + tracer_provider.force_flush(); + if let Err(e) = meter_provider.force_flush() { + eprintln!("failed to flush meter provider: {}", e); + } + } + }) + .await; + let _ = m.send(()); + } } }); - (tracer, flush_tx) + (tracer, meter_provider, flush_tx) }