feat(tvix/castore/blobsvc/grpc): read data in chunks

Whenever this encounters an open_read(), it'll first check for more
granular chunking. If there's more granular chunking data available, a
ChunkedReader is constructed (which supports seeking backwards).

This currently is still a bit stupid, and doesn't compose, as
`ChunkedReader` uses `self` as the `BlobService` to ask for the
individual chunks.

In store composition future, we might want to compose this differently,
essentially constructing `ChunkedReader` with another `BlobService`
representing the entire hierarchy, so there's a chance to locally cache
things, and do less requests.

Change-Id: I22e0df4d6245f666d083b4f0b7114d3ac41d1dce
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11185
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
Florian Klink 2024-03-18 14:33:08 +02:00 committed by flokli
parent 50c81d7838
commit 82f8ce8b7d

View file

@ -1,10 +1,10 @@
use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter, ChunkedReader};
use crate::{ use crate::{
proto::{self, stat_blob_response::ChunkMeta}, proto::{self, stat_blob_response::ChunkMeta},
B3Digest, B3Digest,
}; };
use futures::sink::SinkExt; use futures::sink::SinkExt;
use std::{io, pin::pin, task::Poll}; use std::{io, pin::pin, sync::Arc, task::Poll};
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_stream::{wrappers::ReceiverStream, StreamExt};
@ -54,31 +54,59 @@ impl BlobService for GRPCBlobService {
#[instrument(skip(self, digest), fields(blob.digest=%digest), err)] #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
// Get a stream of [proto::BlobChunk], or return an error if the blob // First try to get a list of chunks. In case there's only one chunk returned,
// doesn't exist. // or the backend does not support chunking, return a NaiveSeeker.
match self // Otherwise use a ChunkedReader.
.grpc_client // TODO: we should check if we want to replace NaiveSeeker with a simple
.clone() // Cursor on the raw `Vec<u8>`, as seeking backwards is something that
.read(proto::ReadBlobRequest { // clients generally do.
digest: digest.clone().into(), match self.chunks(digest).await {
}) Ok(None) => Ok(None),
.await Ok(Some(chunks)) => {
{ if chunks.is_empty() || chunks.len() == 1 {
Ok(stream) => { // No more granular chunking info, treat this as an individual chunk.
// on success, this is a stream of tonic::Result<proto::BlobChunk>, // Get a stream of [proto::BlobChunk], or return an error if the blob
// so access .data and map errors into std::io::Error. // doesn't exist.
let data_stream = stream.into_inner().map(|e| { return match self
e.map(|c| c.data) .grpc_client
.map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s)) .clone()
}); .read(proto::ReadBlobRequest {
digest: digest.clone().into(),
})
.await
{
Ok(stream) => {
let data_stream = stream.into_inner().map(|e| {
e.map(|c| c.data)
.map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s))
});
// Use StreamReader::new to convert to an AsyncRead. // Use StreamReader::new to convert to an AsyncRead.
let data_reader = tokio_util::io::StreamReader::new(data_stream); let data_reader = tokio_util::io::StreamReader::new(data_stream);
Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) Ok(Some(Box::new(NaiveSeeker::new(data_reader))))
}
Err(e) if e.code() == Code::NotFound => Ok(None),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
};
}
// The chunked case. Let ChunkedReader do individual reads.
// TODO: we should store the chunking data in some local cache,
// so `ChunkedReader` doesn't call `self.chunks` *again* for every chunk.
// Think about how store composition will fix this.
let chunked_reader = ChunkedReader::from_chunks(
chunks.into_iter().map(|chunk| {
(
chunk.digest.try_into().expect("invalid b3 digest"),
chunk.size,
)
}),
Arc::new(self.clone()) as Arc<dyn BlobService>,
);
Ok(Some(Box::new(chunked_reader)))
} }
Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(e)?,
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
} }
} }