fix(tvix/castore/grpc/directory): skip_all fields in instrument

This only contains the outer metadata wrapping, and that's not too interesting:

> Request { metadata: MetadataMap { headers: {"content-type":
> "application/grpc", "user-agent": "grpc-go/1.60.1", "te": "trailers",
> "grpc-accept-encoding": "gzip"} }, message: Streaming, extensions:
> Extensions }

Drop these fields for now, and rely on the underlying implementations to
add instrumentation for the application-specific fields.

Clean up the error logging a bit.

Change-Id: Ife1090ed411766a61e1fa60fd4c9570f38de1e98
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11102
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2024-03-08 23:02:47 +02:00 committed by clbot
parent 05deb37f44
commit 4e78de7393

View file

@ -25,7 +25,7 @@ where
{ {
type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>; type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>;
#[instrument(skip(self))] #[instrument(skip_all)]
async fn get( async fn get(
&self, &self,
request: Request<proto::GetDirectoryRequest>, request: Request<proto::GetDirectoryRequest>,
@ -44,13 +44,19 @@ where
.map_err(|_e| Status::invalid_argument("invalid digest length"))?; .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
if !req_inner.recursive { if !req_inner.recursive {
let e: Result<proto::Directory, Status> = let e: Result<proto::Directory, Status> = match self
match self.directory_service.get(&digest).await { .directory_service
.get(&digest)
.await
{
Ok(Some(directory)) => Ok(directory), Ok(Some(directory)) => Ok(directory),
Ok(None) => { Ok(None) => {
Err(Status::not_found(format!("directory {} not found", digest))) Err(Status::not_found(format!("directory {} not found", digest)))
} }
Err(e) => Err(e.into()), Err(e) => {
warn!(err = %e, directory.digest=%digest, "failed to get directory");
Err(e.into())
}
}; };
if tx.send(e).await.is_err() { if tx.send(e).await.is_err() {
@ -76,7 +82,7 @@ where
Ok(Response::new(receiver_stream)) Ok(Response::new(receiver_stream))
} }
#[instrument(skip(self, request))] #[instrument(skip_all)]
async fn put( async fn put(
&self, &self,
request: Request<Streaming<proto::Directory>>, request: Request<Streaming<proto::Directory>>,
@ -98,11 +104,11 @@ where
while let Some(directory) = req_inner.message().await? { while let Some(directory) = req_inner.message().await? {
// validate the directory itself. // validate the directory itself.
if let Err(e) = directory.validate() { if let Err(e) = directory.validate() {
return Err(Status::invalid_argument(format!( warn!(err = %e, "invalid directory");
"directory {} failed validation: {}", Err(Status::invalid_argument(format!(
directory.digest(), "directory failed validation: {}",
e, e,
))); )))?;
} }
// for each child directory this directory refers to, we need // for each child directory this directory refers to, we need
@ -154,7 +160,7 @@ where
// inserting if it's already there, as that'd be a no-op. // inserting if it's already there, as that'd be a no-op.
match self.directory_service.get(&dgst).await { match self.directory_service.get(&dgst).await {
Err(e) => { Err(e) => {
warn!("error checking if directory already exists: {}", e); warn!(err = %e, "failed to check if directory already exists");
return Err(e.into()); return Err(e.into());
} }
// skip if already exists // skip if already exists