feat(tvix/tracing): Allow configuring additional layers

This will be used by tvix-daemon to write tracing data into the active
client's connection socket.

Change-Id: I8889dd0a638e004ee2c8cb312946b029c9779313
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12734
Tested-by: BuildkiteCI
Autosubmit: Vladimir Kryachko <v.kryachko@gmail.com>
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Vova Kryachko 2024-11-05 12:49:21 -05:00 committed by clbot
parent c0c66f8bcc
commit 951d25676b

View file

@ -5,7 +5,11 @@ use tracing::level_filters::LevelFilter;
use tracing_indicatif::{ use tracing_indicatif::{
filter::IndicatifFilter, util::FilteredFormatFields, writer, IndicatifLayer, IndicatifWriter, filter::IndicatifFilter, util::FilteredFormatFields, writer, IndicatifLayer, IndicatifWriter,
}; };
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; use tracing_subscriber::{
layer::{Identity, SubscriberExt},
util::SubscriberInitExt,
EnvFilter, Layer, Registry,
};
#[cfg(feature = "otlp")] #[cfg(feature = "otlp")]
use opentelemetry::{ use opentelemetry::{
@ -175,74 +179,82 @@ impl TracingBuilder {
/// It will also configure otlp if the feature is enabled and a service_name was provided. It /// It will also configure otlp if the feature is enabled and a service_name was provided. It
/// will then correctly setup a channel which is later used for flushing the provider. /// will then correctly setup a channel which is later used for flushing the provider.
pub fn build(self) -> Result<TracingHandle, Error> { pub fn build(self) -> Result<TracingHandle, Error> {
self.build_with_additional(Identity::new())
}
/// Similar to `build()` but allows passing in an additional tracing [`Layer`].
///
/// This method is generic over the `Layer` to avoid the runtime cost of dynamic dispatch.
/// While it only allows passing a single `Layer`, it can be composed of multiple ones:
///
/// ```ignore
/// build_with_additional(
/// fmt::layer()
/// .and_then(some_other_layer)
/// .and_then(yet_another_layer)
/// .with_filter(my_filter)
/// )
/// ```
/// [`Layer`]: tracing_subscriber::layer::Layer
pub fn build_with_additional<L>(self, additional_layer: L) -> Result<TracingHandle, Error>
where
L: Layer<Registry> + Send + Sync + 'static,
{
// Set up the tracing subscriber. // Set up the tracing subscriber.
let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone()); let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
let stdout_writer = indicatif_layer.get_stdout_writer(); let stdout_writer = indicatif_layer.get_stdout_writer();
let stderr_writer = indicatif_layer.get_stderr_writer(); let stderr_writer = indicatif_layer.get_stderr_writer();
let subscriber = tracing_subscriber::registry()
.with( let layered = tracing_subscriber::fmt::Layer::new()
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()
.expect("invalid RUST_LOG"),
)
.with(
tracing_subscriber::fmt::Layer::new()
.fmt_fields(FilteredFormatFields::new( .fmt_fields(FilteredFormatFields::new(
tracing_subscriber::fmt::format::DefaultFields::new(), tracing_subscriber::fmt::format::DefaultFields::new(),
|field| field.name() != "indicatif.pb_show", |field| field.name() != "indicatif.pb_show",
)) ))
.with_writer(indicatif_layer.get_stderr_writer()) .with_writer(indicatif_layer.get_stderr_writer())
.compact(), .compact()
) .and_then((self.progess_bar).then(|| {
.with((self.progess_bar).then(|| {
indicatif_layer.with_filter( indicatif_layer.with_filter(
// only show progress for spans with indicatif.pb_show field being set // only show progress for spans with indicatif.pb_show field being set
IndicatifFilter::new(false), IndicatifFilter::new(false),
) )
})); }));
#[cfg(feature = "tracy")]
let layered = layered.and_then(TracyLayer::default());
let mut tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>> = None;
// Setup otlp if a service_name is configured // Setup otlp if a service_name is configured
#[cfg(feature = "otlp")] #[cfg(feature = "otlp")]
{ let layered = layered.and_then({
if let Some(service_name) = self.service_name { if let Some(service_name) = self.service_name {
// register a text map propagator for trace propagation // register a text map propagator for trace propagation
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
let (tracer, tx) = gen_otlp_tracer(service_name.to_string()); let (tracer, sender) = gen_otlp_tracer(service_name.to_string());
tx = Some(sender);
// Create a tracing layer with the configured tracer // Create a tracing layer with the configured tracer
let layer = tracing_opentelemetry::layer().with_tracer(tracer); Some(tracing_opentelemetry::layer().with_tracer(tracer))
} else {
#[cfg(feature = "tracy")] None
{
subscriber
.with(TracyLayer::default())
.with(Some(layer))
.try_init()?;
} }
#[cfg(not(feature = "tracy"))]
{
subscriber.with(Some(layer)).try_init()?;
}
return Ok(TracingHandle {
tx: Some(tx),
stdout_writer,
stderr_writer,
}); });
}
} let layered = layered.with_filter(
#[cfg(feature = "tracy")] EnvFilter::builder()
{ .with_default_directive(LevelFilter::INFO.into())
subscriber.with(TracyLayer::default()).try_init()?; .from_env()
} .expect("invalid RUST_LOG"),
#[cfg(not(feature = "tracy"))] );
{
subscriber.try_init()?; tracing_subscriber::registry()
} // TODO: if additional_layer has global filters, there is a risk that it will disable the "default" ones,
// while it could be solved by registering `additional_layer` last, it requires boxing `additional_layer`.
.with(additional_layer)
.with(layered)
.try_init()?;
Ok(TracingHandle { Ok(TracingHandle {
tx: None, tx,
stdout_writer, stdout_writer,
stderr_writer, stderr_writer,
}) })