feat(tvix/store/pathinfo/*): add more instrumentation

Add instrumentation to the get() and put() implementations of all
PathInfoService.

Use the nixbase32 representation of the digest, not the base64 one.

Change-Id: Iea79bbd363bf20f23985e877c6fc1793bbee6a7e
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11630
Reviewed-by: picnoir picnoir <picnoir@alternativebit.fr>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-05-12 10:29:58 +03:00 committed by clbot
parent 8324913045
commit ed2c0be920
6 changed files with 20 additions and 11 deletions

View file

@ -6,11 +6,12 @@ use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
use bytes::Bytes;
use data_encoding::HEXLOWER;
use futures::stream::BoxStream;
use nix_compat::nixbase32;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use tonic::async_trait;
use tracing::trace;
use tracing::{instrument, trace};
use tvix_castore::Error;
/// There should not be more than 10 MiB in a single cell.
@ -181,6 +182,7 @@ fn derive_pathinfo_key(digest: &[u8; 20]) -> String {
#[async_trait]
impl PathInfoService for BigtablePathInfoService {
#[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
let mut client = self.client.clone();
let path_info_key = derive_pathinfo_key(&digest);
@ -277,6 +279,7 @@ impl PathInfoService for BigtablePathInfoService {
Ok(Some(path_info))
}
#[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
let store_path = path_info
.validate()

View file

@ -4,8 +4,8 @@ use crate::{
proto::{self, ListPathInfoRequest, PathInfo},
};
use async_stream::try_stream;
use data_encoding::BASE64;
use futures::stream::BoxStream;
use nix_compat::nixbase32;
use tonic::{async_trait, transport::Channel, Code};
use tracing::instrument;
use tvix_castore::{proto as castorepb, Error};
@ -30,7 +30,7 @@ impl GRPCPathInfoService {
#[async_trait]
impl PathInfoService for GRPCPathInfoService {
#[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
#[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
let path_info = self
.grpc_client

View file

@ -1,11 +1,12 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::RwLock;
use async_stream::try_stream;
use futures::stream::BoxStream;
use lru::LruCache;
use nix_compat::nixbase32;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::async_trait;
use tracing::instrument;
use crate::proto::PathInfo;
use tvix_castore::Error;
@ -26,10 +27,12 @@ impl LruPathInfoService {
#[async_trait]
impl PathInfoService for LruPathInfoService {
#[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
Ok(self.lru.write().await.get(&digest).cloned())
}
#[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
// call validate
let store_path = path_info

View file

@ -2,9 +2,11 @@ use super::PathInfoService;
use crate::proto::PathInfo;
use async_stream::try_stream;
use futures::stream::BoxStream;
use nix_compat::nixbase32;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tonic::async_trait;
use tracing::instrument;
use tvix_castore::Error;
#[derive(Default)]
@ -14,6 +16,7 @@ pub struct MemoryPathInfoService {
#[async_trait]
impl PathInfoService for MemoryPathInfoService {
#[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
let db = self.db.read().await;
@ -23,6 +26,7 @@ impl PathInfoService for MemoryPathInfoService {
}
}
#[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
// Call validate on the received PathInfo message.
match path_info.validate() {

View file

@ -1,4 +1,3 @@
use data_encoding::BASE64;
use futures::{stream::BoxStream, TryStreamExt};
use nix_compat::{
narinfo::{self, NarInfo},
@ -71,7 +70,7 @@ where
BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
{
#[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))]
#[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
let narinfo_url = self
.base_url

View file

@ -1,8 +1,8 @@
use super::PathInfoService;
use crate::proto::PathInfo;
use async_stream::try_stream;
use data_encoding::BASE64;
use futures::stream::BoxStream;
use nix_compat::nixbase32;
use prost::Message;
use std::path::Path;
use tonic::async_trait;
@ -38,7 +38,7 @@ impl SledPathInfoService {
#[async_trait]
impl PathInfoService for SledPathInfoService {
#[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
#[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
let resp = tokio::task::spawn_blocking({
let db = self.db.clone();