feat(tvix/tracing): configure metrics support

This creates and registers a global meter provider, which uses the same
mechanism to get notified of flushes.

Change-Id: I856a67f0b282d494de3b2c2a1b79c06ae8ffe252
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12556
Reviewed-by: Jonas Chevalier <zimbatm@zimbatm.com>
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-09-29 23:35:40 +02:00 committed by flokli
parent 6f1d059c7d
commit 5f670a2f67

View file

@ -12,10 +12,7 @@ use tracing_subscriber::{
}; };
#[cfg(feature = "otlp")] #[cfg(feature = "otlp")]
use opentelemetry::{ use opentelemetry::{trace::Tracer, KeyValue};
trace::{Tracer, TracerProvider},
KeyValue,
};
#[cfg(feature = "otlp")] #[cfg(feature = "otlp")]
use opentelemetry_sdk::{ use opentelemetry_sdk::{
propagation::TraceContextPropagator, propagation::TraceContextPropagator,
@ -228,8 +225,15 @@ impl TracingBuilder {
// register a text map propagator for trace propagation // register a text map propagator for trace propagation
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
let (tracer, sender) = gen_otlp_tracer(service_name.to_string()); let (tracer, meter_provider, sender) =
gen_otlp_tracer_meter_provider(service_name.to_string());
flush_tx = Some(sender); flush_tx = Some(sender);
// Register the returned meter provider as the global one.
// FUTUREWORK: store in the struct and provide getter instead?
opentelemetry::global::set_meter_provider(meter_provider);
// Create a tracing layer with the configured tracer // Create a tracing layer with the configured tracer
Some(tracing_opentelemetry::layer().with_tracer(tracer)) Some(tracing_opentelemetry::layer().with_tracer(tracer))
} else { } else {
@ -319,17 +323,49 @@ fn gen_tracer_provider(
Ok(tracer_provider) Ok(tracer_provider)
} }
/// Returns an OTLP tracer, and the TX part of a channel, which can be used
/// to request flushes (and signal back the completion of the flush).
#[cfg(feature = "otlp")] #[cfg(feature = "otlp")]
fn gen_otlp_tracer( fn gen_meter_provider(
service_name: String,
) -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, opentelemetry_sdk::metrics::MetricError> {
use std::time::Duration;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
metrics::{PeriodicReader, SdkMeterProvider},
runtime,
};
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_timeout(Duration::from_secs(10))
.build()?;
Ok(SdkMeterProvider::builder()
.with_reader(
PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(Duration::from_secs(3))
.with_timeout(Duration::from_secs(10))
.build(),
)
.with_resource(gen_resources(service_name))
.build())
}
/// Returns an OTLP tracer, and a meter provider, as well as the TX part
/// of a channel, which can be used to request flushes (and signal back the
/// completion of the flush).
#[cfg(feature = "otlp")]
fn gen_otlp_tracer_meter_provider(
service_name: String, service_name: String,
) -> ( ) -> (
impl Tracer + tracing_opentelemetry::PreSampledTracer + 'static, impl Tracer + tracing_opentelemetry::PreSampledTracer,
opentelemetry_sdk::metrics::SdkMeterProvider,
mpsc::Sender<oneshot::Sender<()>>, mpsc::Sender<oneshot::Sender<()>>,
) { ) {
use opentelemetry::trace::TracerProvider;
let tracer_provider = let tracer_provider =
gen_tracer_provider(service_name.clone()).expect("Unable to configure trace provider"); gen_tracer_provider(service_name.clone()).expect("Unable to configure trace provider");
let meter_provider =
gen_meter_provider(service_name).expect("Unable to configure meter provider");
// tracer_provider needs to be kept around so we can request flushes later. // tracer_provider needs to be kept around so we can request flushes later.
let tracer = tracer_provider.tracer("tvix"); let tracer = tracer_provider.tracer("tvix");
@ -339,23 +375,31 @@ fn gen_otlp_tracer(
// Spawning a task that listens on rx for any message. Once we receive a message we // Spawning a task that listens on rx for any message. Once we receive a message we
// correctly call flush on the tracer_provider. // correctly call flush on the tracer_provider.
tokio::spawn(async move { tokio::spawn({
while let Some(m) = flush_rx.recv().await { let meter_provider = meter_provider.clone();
// Because of a bug within otlp we currently have to use spawn_blocking
// otherwise will calling `force_flush` block forever, especially if the
// tool was closed with ctrl_c. See
// https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
let _ = tokio::task::spawn_blocking({
let tracer_provider = tracer_provider.clone();
move || { async move {
tracer_provider.force_flush(); while let Some(m) = flush_rx.recv().await {
} // Because of a bug within otlp we currently have to use spawn_blocking
}) // otherwise will calling `force_flush` block forever, especially if the
.await; // tool was closed with ctrl_c. See
let _ = m.send(()); // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
let _ = tokio::task::spawn_blocking({
let tracer_provider = tracer_provider.clone();
let meter_provider = meter_provider.clone();
move || {
tracer_provider.force_flush();
if let Err(e) = meter_provider.force_flush() {
eprintln!("failed to flush meter provider: {}", e);
}
}
})
.await;
let _ = m.send(());
}
} }
}); });
(tracer, flush_tx) (tracer, meter_provider, flush_tx)
} }