refactor(tvix/tracing): move otlp setup into helper function
Having all this in the main control flow makes it a bit hard to read. Moving it into a helper function makes it a bit cleaner. Change-Id: Ibdb739dbd1e013b4f8c4aaf9b036a6bd556a1871 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11814 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: Simon Hauser <simon.hauser@helsinki-systems.de> Tested-by: BuildkiteCI
This commit is contained in:
parent
118e69d81d
commit
d25f047b9d
1 changed files with 78 additions and 67 deletions
|
@ -6,7 +6,7 @@ use tracing_indicatif::{filter::IndicatifFilter, IndicatifLayer};
|
|||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::{trace::Tracer, KeyValue};
|
||||
#[cfg(feature = "otlp")]
|
||||
use opentelemetry_sdk::{
|
||||
resource::{ResourceDetector, SdkProvidedResourceDetector},
|
||||
|
@ -170,72 +170,7 @@ impl TracingBuilder {
|
|||
#[cfg(feature = "otlp")]
|
||||
{
|
||||
if let Some(service_name) = self.service_name {
|
||||
let tracer = opentelemetry_otlp::new_pipeline()
|
||||
.tracing()
|
||||
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
|
||||
.with_batch_config(
|
||||
BatchConfigBuilder::default()
|
||||
// the default values for `max_export_batch_size` is set to 512, which we will fill
|
||||
// pretty quickly, which will then result in an export. We want to make sure that
|
||||
// the export is only done once the schedule is met and not as soon as 512 spans
|
||||
// are collected.
|
||||
.with_max_export_batch_size(4096)
|
||||
// analog to default config `max_export_batch_size * 4`
|
||||
.with_max_queue_size(4096 * 4)
|
||||
// only force an export to the otlp collector every 10 seconds to reduce the amount
|
||||
// of error messages if an otlp collector is not available
|
||||
.with_scheduled_delay(std::time::Duration::from_secs(10))
|
||||
.build(),
|
||||
)
|
||||
.with_trace_config(opentelemetry_sdk::trace::config().with_resource({
|
||||
// use SdkProvidedResourceDetector.detect to detect resources,
|
||||
// but replace the default service name with our default.
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/issues/1298
|
||||
let resources =
|
||||
SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
|
||||
// SdkProvidedResourceDetector currently always sets
|
||||
// `service.name`, but we don't like its default.
|
||||
if resources.get("service.name".into()).unwrap() == "unknown_service".into()
|
||||
{
|
||||
resources.merge(&Resource::new([KeyValue::new(
|
||||
"service.name",
|
||||
service_name,
|
||||
)]))
|
||||
} else {
|
||||
resources
|
||||
}
|
||||
}))
|
||||
.install_batch(opentelemetry_sdk::runtime::Tokio)
|
||||
.expect("Failed to install batch exporter using Tokio");
|
||||
|
||||
// Trace provider is need for later use like flushing the provider.
|
||||
// Needs to be kept around for each message to rx we need to handle.
|
||||
let tracer_provider = tracer
|
||||
.provider()
|
||||
.expect("Failed to get the tracer provider");
|
||||
|
||||
// Set up a channel for flushing trace providers later
|
||||
let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
|
||||
|
||||
// Spawning a task that listens on rx for any message. Once we receive a message we
|
||||
// correctly call flush on the tracer_provider.
|
||||
tokio::spawn(async move {
|
||||
while let Some(m) = rx.recv().await {
|
||||
// Because of a bug within otlp we currently have to use spawn_blocking
|
||||
// otherwise will calling `force_flush` block forever, especially if the
|
||||
// tool was closed with ctrl_c. See
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
|
||||
let _ = tokio::task::spawn_blocking({
|
||||
let tracer_provider = tracer_provider.clone();
|
||||
move || tracer_provider.force_flush()
|
||||
})
|
||||
.await;
|
||||
if let Some(tx) = m {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let (tracer, tx) = gen_otlp_tracer(service_name.to_string());
|
||||
// Create a tracing layer with the configured tracer
|
||||
let layer = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||
subscriber.with(Some(layer)).try_init()?;
|
||||
|
@ -247,3 +182,79 @@ impl TracingBuilder {
|
|||
Ok(TracingHandle { tx: None })
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an OTLP tracer, and the TX part of a channel, which can be used
|
||||
/// to request flushes (and signal back the completion of the flush).
|
||||
#[cfg(feature = "otlp")]
|
||||
fn gen_otlp_tracer(
|
||||
service_name: String,
|
||||
) -> (
|
||||
impl Tracer + tracing_opentelemetry::PreSampledTracer,
|
||||
mpsc::Sender<Option<oneshot::Sender<()>>>,
|
||||
) {
|
||||
let tracer = opentelemetry_otlp::new_pipeline()
|
||||
.tracing()
|
||||
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
|
||||
.with_batch_config(
|
||||
BatchConfigBuilder::default()
|
||||
// the default values for `max_export_batch_size` is set to 512, which we will fill
|
||||
// pretty quickly, which will then result in an export. We want to make sure that
|
||||
// the export is only done once the schedule is met and not as soon as 512 spans
|
||||
// are collected.
|
||||
.with_max_export_batch_size(4096)
|
||||
// analog to default config `max_export_batch_size * 4`
|
||||
.with_max_queue_size(4096 * 4)
|
||||
// only force an export to the otlp collector every 10 seconds to reduce the amount
|
||||
// of error messages if an otlp collector is not available
|
||||
.with_scheduled_delay(std::time::Duration::from_secs(10))
|
||||
.build(),
|
||||
)
|
||||
.with_trace_config(opentelemetry_sdk::trace::config().with_resource({
|
||||
// use SdkProvidedResourceDetector.detect to detect resources,
|
||||
// but replace the default service name with our default.
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/issues/1298
|
||||
let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
|
||||
// SdkProvidedResourceDetector currently always sets
|
||||
// `service.name`, but we don't like its default.
|
||||
if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
|
||||
resources.merge(&Resource::new([KeyValue::new(
|
||||
"service.name",
|
||||
service_name,
|
||||
)]))
|
||||
} else {
|
||||
resources
|
||||
}
|
||||
}))
|
||||
.install_batch(opentelemetry_sdk::runtime::Tokio)
|
||||
.expect("Failed to install batch exporter using Tokio");
|
||||
|
||||
// Trace provider is need for later use like flushing the provider.
|
||||
// Needs to be kept around for each message to rx we need to handle.
|
||||
let tracer_provider = tracer
|
||||
.provider()
|
||||
.expect("Failed to get the tracer provider");
|
||||
|
||||
// Set up a channel for flushing trace providers later
|
||||
let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
|
||||
|
||||
// Spawning a task that listens on rx for any message. Once we receive a message we
|
||||
// correctly call flush on the tracer_provider.
|
||||
tokio::spawn(async move {
|
||||
while let Some(m) = rx.recv().await {
|
||||
// Because of a bug within otlp we currently have to use spawn_blocking
|
||||
// otherwise will calling `force_flush` block forever, especially if the
|
||||
// tool was closed with ctrl_c. See
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
|
||||
let _ = tokio::task::spawn_blocking({
|
||||
let tracer_provider = tracer_provider.clone();
|
||||
move || tracer_provider.force_flush()
|
||||
})
|
||||
.await;
|
||||
if let Some(tx) = m {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(tracer, tx)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue