feat(tvix/tracing): correctly close otlp on exit

Provide a new interface for forcing a flush of otlp traces and use this
interface to shutdown otlp prior to exiting tvix-store, either if the
tool was stopped with a SIGTERM or ended regularly.
This also fixes an issue where traces were not even exported if for
example we just imported 10 paths and never even emitted more than 256
traces. The implementation uses a mpsc channel so a flush can be done
without having to wait for it to complete. If you want to wait for a
flush to complete you can provide a oneshot channel which will receive a
message once flushing is complete.

Because of a otlp bug `force_flush` as well as
`shutdown_tracer_provider` need to be executed using `spawn_blocking`
otherwise the function will deadlock. See
https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335

Change-Id: I0a828391adfb1f72dc8305f62ced8cba0515847c
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11803
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Autosubmit: Simon Hauser <simon.hauser@helsinki-systems.de>
This commit is contained in:
Simon Hauser 2024-06-10 17:49:45 +02:00 committed by clbot
parent 5077ca70de
commit fa7ed39bf4
7 changed files with 241 additions and 64 deletions

2
tvix/Cargo.lock generated
View file

@ -4372,6 +4372,8 @@ dependencies = [
"opentelemetry", "opentelemetry",
"opentelemetry-otlp", "opentelemetry-otlp",
"opentelemetry_sdk", "opentelemetry_sdk",
"thiserror",
"tokio",
"tracing", "tracing",
"tracing-indicatif", "tracing-indicatif",
"tracing-opentelemetry", "tracing-opentelemetry",

View file

@ -14007,6 +14007,15 @@ rec {
optional = true; optional = true;
features = [ "rt-tokio" ]; features = [ "rt-tokio" ];
} }
{
name = "thiserror";
packageId = "thiserror";
}
{
name = "tokio";
packageId = "tokio";
features = [ "sync" "rt" ];
}
{ {
name = "tracing"; name = "tracing";
packageId = "tracing"; packageId = "tracing";

View file

@ -54,7 +54,7 @@ enum Commands {
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse(); let cli = Cli::parse();
tvix_tracing::init(cli.log_level)?; let _ = tvix_tracing::TracingBuilder::default().level(cli.log_level);
match cli.command { match cli.command {
Commands::Daemon { Commands::Daemon {

View file

@ -279,7 +279,10 @@ fn lint(code: &str, path: Option<PathBuf>, args: &Args) -> bool {
fn main() { fn main() {
let args = Args::parse(); let args = Args::parse();
tvix_tracing::init(args.log_level).expect("unable to set up tracing subscriber"); let _ = tvix_tracing::TracingBuilder::default()
.level(args.log_level)
.build()
.expect("unable to set up tracing subscriber");
let tokio_runtime = tokio::runtime::Runtime::new().expect("failed to setup tokio runtime"); let tokio_runtime = tokio::runtime::Runtime::new().expect("failed to setup tokio runtime");
let io_handle = init_io_handle(&tokio_runtime, &args); let io_handle = init_io_handle(&tokio_runtime, &args);

View file

@ -199,25 +199,8 @@ fn default_threads() -> usize {
1 1
} }
#[tokio::main] #[instrument(fields(indicatif.pb_show=1), skip(cli))]
#[instrument(fields(indicatif.pb_show=1))] async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
#[cfg(feature = "otlp")]
{
if cli.otlp {
tvix_tracing::init_with_otlp(cli.log_level, "tvix.store")?;
} else {
tvix_tracing::init(cli.log_level)?;
}
}
#[cfg(not(feature = "otlp"))]
{
tvix_tracing::init(cli.log_level)?;
}
match cli.command { match cli.command {
Commands::Daemon { Commands::Daemon {
listen_address, listen_address,
@ -528,3 +511,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}; };
Ok(()) Ok(())
} }
#[tokio::main]
#[instrument(fields(indicatif.pb_show=1))]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
let tracing_handle = {
let mut builder = tvix_tracing::TracingBuilder::default();
builder = builder.level(cli.log_level);
#[cfg(feature = "otlp")]
{
if cli.otlp {
builder = builder.enable_otlp("tvix.store");
}
}
builder.build()?
};
tokio::select! {
res = tokio::signal::ctrl_c() => {
res?;
if let Err(e) = tracing_handle.force_shutdown().await {
eprintln!("failed to shutdown tracing: {e}");
}
Ok(())
},
res = run_cli(cli) => {
if let Err(e) = tracing_handle.shutdown().await {
eprintln!("failed to shutdown tracing: {e}");
}
res
}
}
}

View file

@ -9,6 +9,8 @@ tracing = { version = "0.1.40", features = ["max_level_trace", "release_max_leve
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
indicatif = "0.17.8" indicatif = "0.17.8"
tracing-indicatif = "0.3.6" tracing-indicatif = "0.3.6"
tokio = { version = "1.32.0" , features = ["sync", "rt"] }
thiserror = "1.0.38"
tracing-opentelemetry = { version = "0.23.0", optional = true } tracing-opentelemetry = { version = "0.23.0", optional = true }
opentelemetry = { version = "0.22.0", optional = true } opentelemetry = { version = "0.22.0", optional = true }

View file

@ -1,5 +1,6 @@
use indicatif::ProgressStyle; use indicatif::ProgressStyle;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use tokio::sync::{mpsc, oneshot};
use tracing::Level; use tracing::Level;
use tracing_indicatif::{filter::IndicatifFilter, IndicatifLayer}; use tracing_indicatif::{filter::IndicatifFilter, IndicatifLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
@ -24,20 +25,136 @@ lazy_static! {
.expect("invalid progress template"); .expect("invalid progress template");
} }
// using a macro_rule here because of the complex return type #[derive(thiserror::Error, Debug)]
macro_rules! init_base_subscriber { pub enum Error {
($level: expr) => {{ #[error(transparent)]
let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone()); Init(#[from] tracing_subscriber::util::TryInitError),
#[error(transparent)]
MpscSend(#[from] mpsc::error::SendError<Option<oneshot::Sender<()>>>),
#[error(transparent)]
OneshotRecv(#[from] oneshot::error::RecvError),
}
#[derive(Clone)]
pub struct TracingHandle {
tx: Option<mpsc::Sender<Option<oneshot::Sender<()>>>>,
}
impl TracingHandle {
/// This will flush possible attached tracing providers, e.g. otlp exported, if enabled.
/// If there is none enabled this will result in a noop.
///
/// It will not wait until the flush is complete, but you can pass in an oneshot::Sender which
/// will receive a message once the flush is completed.
pub async fn flush(&self, msg: Option<oneshot::Sender<()>>) -> Result<(), Error> {
if let Some(tx) = &self.tx {
Ok(tx.send(msg).await?)
} else {
// If we have a message passed in we need to notify the receiver
if let Some(tx) = msg {
let _ = tx.send(());
}
Ok(())
}
}
/// This will flush all all attached tracing providers and will wait until the flush is completed.
/// If no tracing providers like otlp are attached then this will be a noop.
///
/// This should only be called on a regular shutdown.
/// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown)
/// otherwise you will get otlp errors.
pub async fn shutdown(&self) -> Result<(), Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.flush(Some(tx)).await?;
rx.await?;
Ok(())
}
/// This will flush all all attached tracing providers and will wait until the flush is completed.
/// After this it will do some other necessary cleanup.
/// If no tracing providers like otlp are attached then this will be a noop.
///
/// This should only be used if the tool received an ctrl_c otherwise you will get otlp errors.
/// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown)
/// method.
pub async fn force_shutdown(&self) -> Result<(), Error> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.flush(Some(tx)).await?;
rx.await?;
#[cfg(feature = "otlp")]
{
// Because of a bug within otlp we currently have to use spawn_blocking otherwise
// calling `shutdown_tracer_provider` can block forever. See
// https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
//
// This still throws an error, if the tool exits regularly: "OpenTelemetry trace error
// occurred. oneshot canceled", but not having this leads to errors if we cancel with
// ctrl_c.
// So this should right now only be used on ctrl_c, for a regular exit use the
// [shutdown](#shutdown) method
let _ = tokio::task::spawn_blocking(move || {
opentelemetry::global::shutdown_tracer_provider();
})
.await;
}
Ok(())
}
}
pub struct TracingBuilder {
level: Level,
#[cfg(feature = "otlp")]
service_name: Option<&'static str>,
}
impl Default for TracingBuilder {
fn default() -> Self {
TracingBuilder {
level: Level::INFO,
#[cfg(feature = "otlp")]
service_name: None,
}
}
}
impl TracingBuilder {
/// Set the log level of the stderr writer, RUST_LOG still has a higher priority over this
pub fn level(mut self, level: Level) -> TracingBuilder {
self.level = level;
self
}
#[cfg(feature = "otlp")]
/// Enable otlp by setting a custom service_name
pub fn enable_otlp(mut self, service_name: &'static str) -> TracingBuilder {
self.service_name = Some(service_name);
self
}
/// This will setup tracing based on the configuration passed in.
/// It will setup a stderr writer with the provided log level as filter (RUST_LOG still has a
/// higher priority over the configured value)
///
/// It will also configure otlp if the feature is enabled and a service_name was provided. It
/// will then correctly setup a channel which is later used for flushing the provider.
pub fn build(self) -> Result<TracingHandle, Error> {
// Set up the tracing subscriber. // Set up the tracing subscriber.
tracing_subscriber::registry() let indicatif_layer = IndicatifLayer::new().with_progress_style(PB_SPINNER_STYLE.clone());
let subscriber = tracing_subscriber::registry()
.with( .with(
tracing_subscriber::fmt::Layer::new() tracing_subscriber::fmt::Layer::new()
.with_writer(indicatif_layer.get_stderr_writer()) .with_writer(indicatif_layer.get_stderr_writer())
.compact() .compact()
.with_filter( .with_filter(
EnvFilter::builder() EnvFilter::builder()
.with_default_directive($level.into()) .with_default_directive(self.level.into())
.from_env() .from_env()
.expect("invalid RUST_LOG"), .expect("invalid RUST_LOG"),
), ),
@ -45,46 +162,73 @@ macro_rules! init_base_subscriber {
.with(indicatif_layer.with_filter( .with(indicatif_layer.with_filter(
// only show progress for spans with indicatif.pb_show field being set // only show progress for spans with indicatif.pb_show field being set
IndicatifFilter::new(false), IndicatifFilter::new(false),
)) ));
}};
}
pub fn init(level: Level) -> Result<(), tracing_subscriber::util::TryInitError> { // Setup otlp if a service_name is configured
init_base_subscriber!(level).try_init() #[cfg(feature = "otlp")]
} {
if let Some(service_name) = self.service_name {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.with_batch_config(BatchConfig::default())
.with_trace_config(opentelemetry_sdk::trace::config().with_resource({
// use SdkProvidedResourceDetector.detect to detect resources,
// but replace the default service name with our default.
// https://github.com/open-telemetry/opentelemetry-rust/issues/1298
let resources =
SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
// SdkProvidedResourceDetector currently always sets
// `service.name`, but we don't like its default.
if resources.get("service.name".into()).unwrap() == "unknown_service".into()
{
resources.merge(&Resource::new([KeyValue::new(
"service.name",
service_name,
)]))
} else {
resources
}
}))
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("Failed to install batch exporter using Tokio");
#[cfg(feature = "otlp")] // Trace provider is need for later use like flushing the provider.
pub fn init_with_otlp( // Needs to be kept around for each message to rx we need to handle.
level: Level, let tracer_provider = tracer
service_name: &'static str, .provider()
) -> Result<(), tracing_subscriber::util::TryInitError> { .expect("Failed to get the tracer provider");
let subscriber = init_base_subscriber!(level);
let tracer = opentelemetry_otlp::new_pipeline() // Set up a channel for flushing trace providers later
.tracing() let (tx, mut rx) = mpsc::channel::<Option<oneshot::Sender<()>>>(16);
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.with_batch_config(BatchConfig::default()) // Spawning a task that listens on rx for any message. Once we receive a message we
.with_trace_config(opentelemetry_sdk::trace::config().with_resource({ // correctly call flush on the tracer_provider.
// use SdkProvidedResourceDetector.detect to detect resources, tokio::spawn(async move {
// but replace the default service name with our default. while let Some(m) = rx.recv().await {
// https://github.com/open-telemetry/opentelemetry-rust/issues/1298 // Because of a bug within otlp we currently have to use spawn_blocking
let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); // otherwise will calling `force_flush` block forever, especially if the
// SdkProvidedResourceDetector currently always sets // tool was closed with ctrl_c. See
// `service.name`, but we don't like its default. // https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
if resources.get("service.name".into()).unwrap() == "unknown_service".into() { let _ = tokio::task::spawn_blocking({
resources.merge(&Resource::new([KeyValue::new( let tracer_provider = tracer_provider.clone();
"service.name", move || tracer_provider.force_flush()
service_name, })
)])) .await;
} else { if let Some(tx) = m {
resources let _ = tx.send(());
}
}
});
// Create a tracing layer with the configured tracer
let layer = tracing_opentelemetry::layer().with_tracer(tracer);
subscriber.with(Some(layer)).try_init()?;
return Ok(TracingHandle { tx: Some(tx) });
} }
})) }
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("Failed to install tokio runtime");
// Create a tracing layer with the configured tracer subscriber.try_init()?;
let layer = tracing_opentelemetry::layer().with_tracer(tracer); Ok(TracingHandle { tx: None })
}
subscriber.with(Some(layer)).try_init()
} }