feat(tvix/castore/blobsvc): add chunks method

This adds support to retrieve a list of chunks for a given blob to the
BlobService interface.

While theoretically all chunk-awareness could be kept private inside
each BlobService reader, we'd not be able to resolve individual chunks
from different Blobservices - and due to this, not able to substitute
chunks we already have in a more local store.

This function allows asking a BlobService for the list of chunks,
leaving any actual fetching up to the caller (be it through individual
calls to open_read), or asking another store for it.

Change-Id: I1d33c591195ed494be3aec71a8c804743cbe0dca
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10586
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
This commit is contained in:
Florian Klink 2024-01-09 15:00:01 +02:00 committed by clbot
parent 9596c5caff
commit 719cbad871
2 changed files with 47 additions and 2 deletions

View file

@ -1,5 +1,8 @@
use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
use crate::{proto, B3Digest}; use crate::{
proto::{self, stat_blob_response::ChunkMeta},
B3Digest,
};
use futures::sink::SinkExt; use futures::sink::SinkExt;
use std::{io, pin::pin, task::Poll}; use std::{io, pin::pin, task::Poll};
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@ -10,7 +13,7 @@ use tokio_util::{
sync::PollSender, sync::PollSender,
}; };
use tonic::{async_trait, transport::Channel, Code, Status}; use tonic::{async_trait, transport::Channel, Code, Status};
use tracing::instrument; use tracing::{instrument, warn};
/// Connects to a (remote) tvix-store BlobService over gRPC. /// Connects to a (remote) tvix-store BlobService over gRPC.
#[derive(Clone)] #[derive(Clone)]
@ -108,6 +111,31 @@ impl BlobService for GRPCBlobService {
digest: None, digest: None,
}) })
} }
#[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
let resp = self
.grpc_client
.clone()
.stat(proto::StatBlobRequest {
digest: digest.clone().into(),
send_chunks: true,
..Default::default()
})
.await;
match resp {
Err(e) if e.code() == Code::NotFound => Ok(None),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
Ok(resp) => {
let resp = resp.into_inner();
if resp.chunks.is_empty() {
warn!("chunk list is empty");
}
Ok(Some(resp.chunks))
}
}
}
} }
pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {

View file

@ -1,6 +1,7 @@
use std::io; use std::io;
use tonic::async_trait; use tonic::async_trait;
use crate::proto::stat_blob_response::ChunkMeta;
use crate::B3Digest; use crate::B3Digest;
mod from_addr; mod from_addr;
@ -35,6 +36,22 @@ pub trait BlobService: Send + Sync {
/// Insert a new blob into the store. Returns a [BlobWriter], which /// Insert a new blob into the store. Returns a [BlobWriter], which
/// implements [io::Write] and a [BlobWriter::close]. /// implements [io::Write] and a [BlobWriter::close].
async fn open_write(&self) -> Box<dyn BlobWriter>; async fn open_write(&self) -> Box<dyn BlobWriter>;
/// Return a list of chunks for a given blob.
/// There's a distinction between returning Ok(None) and Ok(Some(vec![])).
/// The former return value is sent in case the blob is not present at all,
/// while the second one is sent in case there's no more granular chunks (or
/// the backend does not support chunking).
/// A default implementation signalling the backend does not support
/// chunking is provided.
async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
if !self.has(digest).await? {
return Ok(None);
} else {
// default implementation, signalling the backend does not support chunking.
return Ok(Some(vec![]));
}
}
} }
/// A [tokio::io::AsyncWrite] that you need to close() afterwards, and get back /// A [tokio::io::AsyncWrite] that you need to close() afterwards, and get back