feat(tvix/castore): instrument tokio task with current span

By default tokio::spawn does not instrument the spawned task with the
current spawn (https://github.com/tokio-rs/tokio/discussions/6008), do
this manually for all tokio::spawn functions in functions that are
instrumented.

Change-Id: I83dd8145b3a62421454aff57d34180cebbee8304
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11864
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: Simon Hauser <simon.hauser@helsinki-systems.de>
This commit is contained in:
Simon Hauser 2024-06-20 16:10:12 +02:00
parent bd8d74a3ee
commit 2b20d8d82d
3 changed files with 27 additions and 18 deletions

View file

@ -18,7 +18,7 @@ use tokio_util::{
sync::PollSender,
};
use tonic::{async_trait, transport::Channel, Code, Status};
use tracing::instrument;
use tracing::{instrument, Instrument as _};
/// Connects to a (remote) tvix-store BlobService over gRPC.
#[derive(Clone)]
@ -133,6 +133,8 @@ impl BlobService for GRPCBlobService {
let task = tokio::spawn({
let mut grpc_client = self.grpc_client.clone();
async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) }
// instrument the task with the current span, this is not done by default
.in_current_span()
});
// The tx part of the channel is converted to a sink of byte chunks.

View file

@ -12,7 +12,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::async_trait;
use tonic::Code;
use tonic::{transport::Channel, Status};
use tracing::{instrument, warn};
use tracing::{instrument, warn, Instrument as _};
/// Connects to a (remote) tvix-store DirectoryService over gRPC.
#[derive(Clone)]
@ -194,14 +194,17 @@ impl DirectoryService for GRPCDirectoryService {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move {
let s = grpc_client
.put(UnboundedReceiverStream::new(rx))
.await?
.into_inner();
let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(
async move {
let s = grpc_client
.put(UnboundedReceiverStream::new(rx))
.await?
.into_inner();
Ok(s)
});
Ok(s)
} // instrument the task with the current span, this is not done by default
.in_current_span(),
);
Box::new(GRPCPutter {
rq: Some((task, tx)),

View file

@ -43,7 +43,7 @@ use tokio::{
io::{AsyncReadExt, AsyncSeekExt},
sync::mpsc,
};
use tracing::{debug, error, instrument, warn, Span};
use tracing::{debug, error, instrument, warn, Instrument as _, Span};
/// This implements a read-only FUSE filesystem for a tvix-store
/// with the passed [BlobService], [DirectoryService] and [RootNodes].
@ -397,16 +397,20 @@ where
// This task will run in the background immediately and will exit
// after the stream ends or if we no longer want any more entries.
self.tokio_handle.spawn(async move {
let mut stream = root_nodes_provider.list().enumerate();
while let Some(node) = stream.next().await {
if tx.send(node).await.is_err() {
// If we get a send error, it means the sync code
// doesn't want any more entries.
break;
self.tokio_handle.spawn(
async move {
let mut stream = root_nodes_provider.list().enumerate();
while let Some(node) = stream.next().await {
if tx.send(node).await.is_err() {
// If we get a send error, it means the sync code
// doesn't want any more entries.
break;
}
}
}
});
// instrument the task with the current span, this is not done by default
.in_current_span(),
);
// Put the rx part into [self.dir_handles].
// TODO: this will overflow after 2**64 operations,